This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new ec2c2e978e mr:Fix ugi not correct in WORKER_POOL (#10661)
ec2c2e978e is described below

commit ec2c2e978e40a4bf7bf6a3a0bccbe7e3a68a0cc1
Author: liu yang <[email protected]>
AuthorDate: Fri Jul 26 21:30:00 2024 +0800

    mr:Fix ugi not correct in WORKER_POOL (#10661)
---
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   | 18 +++++++++
 .../apache/iceberg/mr/TestIcebergInputFormats.java | 44 ++++++++++++++++++++++
 2 files changed, 62 insertions(+)

diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index a95454b8b0..a222080d71 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.function.BiFunction;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -44,6 +45,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.SystemConfigs;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
@@ -78,6 +80,7 @@ import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PartitionUtil;
 import org.apache.iceberg.util.SerializationUtil;
+import org.apache.iceberg.util.ThreadPools;
 
 /**
  * Generic Mrv2 InputFormat API for Iceberg.
@@ -104,7 +107,21 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
         Optional.ofNullable(
                 HiveIcebergStorageHandler.table(conf, 
conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
             .orElseGet(() -> Catalogs.loadTable(conf));
+    final ExecutorService workerPool =
+        ThreadPools.newWorkerPool(
+            "iceberg-plan-worker-pool",
+            conf.getInt(
+                SystemConfigs.WORKER_THREAD_POOL_SIZE.propertyKey(),
+                ThreadPools.WORKER_THREAD_POOL_SIZE));
+    try {
+      return planInputSplits(table, conf, workerPool);
+    } finally {
+      workerPool.shutdown();
+    }
+  }
 
+  private List<InputSplit> planInputSplits(
+      Table table, Configuration conf, ExecutorService workerPool) {
     TableScan scan =
         table
             .newScan()
@@ -144,6 +161,7 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
     InputFormatConfig.InMemoryDataModel model =
         conf.getEnum(
             InputFormatConfig.IN_MEMORY_DATA_MODEL, 
InputFormatConfig.InMemoryDataModel.GENERIC);
+    scan = scan.planWith(workerPool);
     try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) 
{
       Table serializableTable = SerializableTable.copyOf(table);
       tasksIterable.forEach(
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java 
b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index c9d5d487de..86d390ca9f 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -25,11 +25,14 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.lang.reflect.Method;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.security.PrivilegedAction;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
@@ -39,6 +42,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
@@ -67,6 +71,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ThreadPools;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -381,6 +386,45 @@ public class TestIcebergInputFormats {
     testInputFormat.create(builder.conf()).validate(expectedRecords);
   }
 
+  @TestTemplate
+  public void testWorkerPool() throws Exception {
+    Table table = helper.createUnpartitionedTable();
+    UserGroupInformation user1 =
+        UserGroupInformation.createUserForTesting("user1", new String[] {});
+    UserGroupInformation user2 =
+        UserGroupInformation.createUserForTesting("user2", new String[] {});
+    final ExecutorService workerPool1 = 
ThreadPools.newWorkerPool("iceberg-plan-worker-pool", 1);
+    final ExecutorService workerPool2 = 
ThreadPools.newWorkerPool("iceberg-plan-worker-pool", 1);
+    try {
+      assertThat(getUserFromWorkerPool(user1, table, 
workerPool1)).isEqualTo("user1");
+      assertThat(getUserFromWorkerPool(user2, table, 
workerPool1)).isEqualTo("user1");
+      assertThat(getUserFromWorkerPool(user2, table, 
workerPool2)).isEqualTo("user2");
+    } finally {
+      workerPool1.shutdown();
+      workerPool2.shutdown();
+    }
+  }
+
+  private String getUserFromWorkerPool(
+      UserGroupInformation user, Table table, ExecutorService workerpool) 
throws Exception {
+    Method method =
+        IcebergInputFormat.class.getDeclaredMethod(
+            "planInputSplits", Table.class, Configuration.class, 
ExecutorService.class);
+    method.setAccessible(true);
+    return user.doAs(
+        (PrivilegedAction<String>)
+            () -> {
+              try {
+                method.invoke(new IcebergInputFormat<>(), table, conf, 
workerpool);
+                return workerpool
+                    .submit(() -> 
UserGroupInformation.getCurrentUser().getUserName())
+                    .get();
+              } catch (Exception e) {
+                throw new RuntimeException("Failed to get user from worker 
pool", e);
+              }
+            });
+  }
+
   // TODO - Capture template type T in toString method:
   // https://github.com/apache/iceberg/issues/1542
   public abstract static class TestInputFormat<T> {

Reply via email to