This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new ec2c2e978e mr:Fix ugi not correct in WORKER_POOL (#10661)
ec2c2e978e is described below
commit ec2c2e978e40a4bf7bf6a3a0bccbe7e3a68a0cc1
Author: liu yang <[email protected]>
AuthorDate: Fri Jul 26 21:30:00 2024 +0800
mr:Fix ugi not correct in WORKER_POOL (#10661)
---
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 18 +++++++++
.../apache/iceberg/mr/TestIcebergInputFormats.java | 44 ++++++++++++++++++++++
2 files changed, 62 insertions(+)
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index a95454b8b0..a222080d71 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -44,6 +45,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
@@ -78,6 +80,7 @@ import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SerializationUtil;
+import org.apache.iceberg.util.ThreadPools;
/**
* Generic Mrv2 InputFormat API for Iceberg.
@@ -104,7 +107,21 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
Optional.ofNullable(
HiveIcebergStorageHandler.table(conf,
conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
.orElseGet(() -> Catalogs.loadTable(conf));
+ final ExecutorService workerPool =
+ ThreadPools.newWorkerPool(
+ "iceberg-plan-worker-pool",
+ conf.getInt(
+ SystemConfigs.WORKER_THREAD_POOL_SIZE.propertyKey(),
+ ThreadPools.WORKER_THREAD_POOL_SIZE));
+ try {
+ return planInputSplits(table, conf, workerPool);
+ } finally {
+ workerPool.shutdown();
+ }
+ }
+ private List<InputSplit> planInputSplits(
+ Table table, Configuration conf, ExecutorService workerPool) {
TableScan scan =
table
.newScan()
@@ -144,6 +161,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
InputFormatConfig.InMemoryDataModel model =
conf.getEnum(
InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
+ scan = scan.planWith(workerPool);
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks())
{
Table serializableTable = SerializableTable.copyOf(table);
tasksIterable.forEach(
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index c9d5d487de..86d390ca9f 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -25,11 +25,14 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.lang.reflect.Method;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.security.PrivilegedAction;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
@@ -39,6 +42,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
@@ -67,6 +71,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ThreadPools;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -381,6 +386,45 @@ public class TestIcebergInputFormats {
testInputFormat.create(builder.conf()).validate(expectedRecords);
}
+ @TestTemplate
+ public void testWorkerPool() throws Exception {
+ Table table = helper.createUnpartitionedTable();
+ UserGroupInformation user1 =
+ UserGroupInformation.createUserForTesting("user1", new String[] {});
+ UserGroupInformation user2 =
+ UserGroupInformation.createUserForTesting("user2", new String[] {});
+ final ExecutorService workerPool1 =
ThreadPools.newWorkerPool("iceberg-plan-worker-pool", 1);
+ final ExecutorService workerPool2 =
ThreadPools.newWorkerPool("iceberg-plan-worker-pool", 1);
+ try {
+ assertThat(getUserFromWorkerPool(user1, table,
workerPool1)).isEqualTo("user1");
+ assertThat(getUserFromWorkerPool(user2, table,
workerPool1)).isEqualTo("user1");
+ assertThat(getUserFromWorkerPool(user2, table,
workerPool2)).isEqualTo("user2");
+ } finally {
+ workerPool1.shutdown();
+ workerPool2.shutdown();
+ }
+ }
+
+ private String getUserFromWorkerPool(
+ UserGroupInformation user, Table table, ExecutorService workerpool)
throws Exception {
+ Method method =
+ IcebergInputFormat.class.getDeclaredMethod(
+ "planInputSplits", Table.class, Configuration.class,
ExecutorService.class);
+ method.setAccessible(true);
+ return user.doAs(
+ (PrivilegedAction<String>)
+ () -> {
+ try {
+ method.invoke(new IcebergInputFormat<>(), table, conf,
workerpool);
+ return workerpool
+ .submit(() ->
UserGroupInformation.getCurrentUser().getUserName())
+ .get();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get user from worker
pool", e);
+ }
+ });
+ }
+
// TODO - Capture template type T in toString method:
// https://github.com/apache/iceberg/issues/1542
public abstract static class TestInputFormat<T> {