This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 45867be6cb5 HIVE-28276: Iceberg: Make Iceberg split threads
configurable when table scanning (Butao Zhang, reviewed by Ayush Saxena, Denys
Kuzmenko)
45867be6cb5 is described below
commit 45867be6cb5308566e4cf16c7b4cf8081085b58c
Author: Butao Zhang <[email protected]>
AuthorDate: Mon Jun 3 23:10:20 2024 +0800
HIVE-28276: Iceberg: Make Iceberg split threads configurable when table
scanning (Butao Zhang, reviewed by Ayush Saxena, Denys Kuzmenko)
Closes #5260
---
.../apache/iceberg/mr/mapreduce/IcebergInputFormat.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 754d78e4d93..566832bedfb 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -62,6 +63,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
@@ -97,6 +99,7 @@ import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SerializationUtil;
+import org.apache.iceberg.util.ThreadPools;
/**
* Generic Mrv2 InputFormat API for Iceberg.
@@ -207,19 +210,30 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tbl.name(),
SerializationUtil.serializeToBase64(tbl));
return tbl;
});
+ final ExecutorService workerPool =
+ ThreadPools.newWorkerPool("iceberg-plan-worker-pool",
+ conf.getInt(SystemConfigs.WORKER_THREAD_POOL_SIZE.propertyKey(),
ThreadPools.WORKER_THREAD_POOL_SIZE));
+ try {
+ return planInputSplits(table, conf, workerPool);
+ } finally {
+ workerPool.shutdown();
+ }
+ }
+ private List<InputSplit> planInputSplits(Table table, Configuration conf,
ExecutorService workerPool) {
List<InputSplit> splits = Lists.newArrayList();
boolean applyResidual =
!conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
InputFormatConfig.InMemoryDataModel model =
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
long fromVersion =
conf.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1);
- Scan<?, FileScanTask, CombinedScanTask> scan;
+ Scan<? extends Scan, FileScanTask, CombinedScanTask> scan;
if (fromVersion != -1) {
scan = applyConfig(conf, createIncrementalAppendScan(table, conf));
} else {
scan = applyConfig(conf, createTableScan(table, conf));
}
+ scan = scan.planWith(workerPool);
boolean allowDataFilesWithinTableLocationOnly =
conf.getBoolean(HiveConf.ConfVars.HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY.varname,