This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 45867be6cb5 HIVE-28276: Iceberg: Make Iceberg split threads 
configurable when table scanning (Butao Zhang, reviewed by Ayush Saxena, Denys 
Kuzmenko)
45867be6cb5 is described below

commit 45867be6cb5308566e4cf16c7b4cf8081085b58c
Author: Butao Zhang <[email protected]>
AuthorDate: Mon Jun 3 23:10:20 2024 +0800

    HIVE-28276: Iceberg: Make Iceberg split threads configurable when table 
scanning (Butao Zhang, reviewed by Ayush Saxena, Denys Kuzmenko)
    
    Closes #5260
---
 .../apache/iceberg/mr/mapreduce/IcebergInputFormat.java  | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 754d78e4d93..566832bedfb 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -62,6 +63,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.SystemConfigs;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
@@ -97,6 +99,7 @@ import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PartitionUtil;
 import org.apache.iceberg.util.SerializationUtil;
+import org.apache.iceberg.util.ThreadPools;
 
 /**
  * Generic Mrv2 InputFormat API for Iceberg.
@@ -207,19 +210,30 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
           conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tbl.name(), 
SerializationUtil.serializeToBase64(tbl));
           return tbl;
         });
+    final ExecutorService workerPool =
+        ThreadPools.newWorkerPool("iceberg-plan-worker-pool",
+            conf.getInt(SystemConfigs.WORKER_THREAD_POOL_SIZE.propertyKey(), 
ThreadPools.WORKER_THREAD_POOL_SIZE));
+    try {
+      return planInputSplits(table, conf, workerPool);
+    } finally {
+      workerPool.shutdown();
+    }
+  }
 
+  private List<InputSplit> planInputSplits(Table table, Configuration conf, 
ExecutorService workerPool) {
     List<InputSplit> splits = Lists.newArrayList();
     boolean applyResidual = 
!conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
     InputFormatConfig.InMemoryDataModel model = 
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
         InputFormatConfig.InMemoryDataModel.GENERIC);
 
     long fromVersion = 
conf.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1);
-    Scan<?, FileScanTask, CombinedScanTask> scan;
+    Scan<? extends Scan, FileScanTask, CombinedScanTask> scan;
     if (fromVersion != -1) {
       scan = applyConfig(conf, createIncrementalAppendScan(table, conf));
     } else {
       scan = applyConfig(conf, createTableScan(table, conf));
     }
+    scan = scan.planWith(workerPool);
 
     boolean allowDataFilesWithinTableLocationOnly =
         
conf.getBoolean(HiveConf.ConfVars.HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY.varname,

Reply via email to