This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new d0aa1ffda perf: Add Comet config for native Iceberg reader's data file 
concurrency (#3584)
d0aa1ffda is described below

commit d0aa1ffdaed1e27bb26427c3aba9ca583233df40
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue Feb 24 19:44:45 2026 -0500

    perf: Add Comet config for native Iceberg reader's data file concurrency 
(#3584)
---
 common/src/main/scala/org/apache/comet/CometConf.scala        | 11 +++++++++++
 native/core/src/execution/operators/iceberg_scan.rs           |  6 +++++-
 native/core/src/execution/planner.rs                          |  2 ++
 native/proto/src/proto/operator.proto                         |  3 +++
 .../apache/comet/serde/operator/CometIcebergNativeScan.scala  |  4 +++-
 5 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 480eafdcb..5ee777f3d 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -148,6 +148,17 @@ object CometConf extends ShimCometConf {
       .booleanConf
       .createWithDefault(false)
 
+  val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] =
+    conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit")
+      .category(CATEGORY_SCAN)
+      .doc(
+        "The number of Iceberg data files to read concurrently within a single 
task. " +
+          "Higher values improve throughput for tables with many small files 
by overlapping " +
+          "I/O latency, but increase memory usage. Values between 2 and 8 are 
suggested.")
+      .intConf
+      .checkValue(v => v > 0, "Data file concurrency limit must be positive")
+      .createWithDefault(1)
+
   val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
     conf("spark.comet.scan.csv.v2.enabled")
       .category(CATEGORY_TESTING)
diff --git a/native/core/src/execution/operators/iceberg_scan.rs 
b/native/core/src/execution/operators/iceberg_scan.rs
index bc20592e9..39ce25002 100644
--- a/native/core/src/execution/operators/iceberg_scan.rs
+++ b/native/core/src/execution/operators/iceberg_scan.rs
@@ -61,6 +61,8 @@ pub struct IcebergScanExec {
     catalog_properties: HashMap<String, String>,
     /// Pre-planned file scan tasks
     tasks: Vec<FileScanTask>,
+    /// Number of data files to read concurrently
+    data_file_concurrency_limit: usize,
     /// Metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -71,6 +73,7 @@ impl IcebergScanExec {
         schema: SchemaRef,
         catalog_properties: HashMap<String, String>,
         tasks: Vec<FileScanTask>,
+        data_file_concurrency_limit: usize,
     ) -> Result<Self, ExecutionError> {
         let output_schema = schema;
         let plan_properties = 
Self::compute_properties(Arc::clone(&output_schema), 1);
@@ -83,6 +86,7 @@ impl IcebergScanExec {
             plan_properties,
             catalog_properties,
             tasks,
+            data_file_concurrency_limit,
             metrics,
         })
     }
@@ -158,7 +162,7 @@ impl IcebergScanExec {
 
         let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io)
             .with_batch_size(batch_size)
-            
.with_data_file_concurrency_limit(context.session_config().target_partitions())
+            .with_data_file_concurrency_limit(self.data_file_concurrency_limit)
             .with_row_selection_enabled(true)
             .build();
 
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index f84d6cc59..ef81cdfbf 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -1176,12 +1176,14 @@ impl PhysicalPlanner {
                     .collect();
                 let metadata_location = common.metadata_location.clone();
                 let tasks = parse_file_scan_tasks_from_common(common, 
&scan.file_scan_tasks)?;
+                let data_file_concurrency_limit = 
common.data_file_concurrency_limit as usize;
 
                 let iceberg_scan = IcebergScanExec::new(
                     metadata_location,
                     required_schema,
                     catalog_properties,
                     tasks,
+                    data_file_concurrency_limit,
                 )?;
 
                 Ok((
diff --git a/native/proto/src/proto/operator.proto 
b/native/proto/src/proto/operator.proto
index 93872b462..bf2752bdd 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -175,6 +175,9 @@ message IcebergScanCommon {
   repeated PartitionData partition_data_pool = 9;
   repeated DeleteFileList delete_files_pool = 10;
   repeated spark.spark_expression.Expr residual_pool = 11;
+
+  // Number of data files to read concurrently within a single task
+  uint32 data_file_concurrency_limit = 12;
 }
 
 message IcebergScan {
diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
index c86b2a51b..9f1a01599 100644
--- 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
+++ 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec, 
CometNativeExec}
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
DataSourceRDD, DataSourceRDDPartition}
 import org.apache.spark.sql.types._
 
-import org.apache.comet.ConfigEntry
+import org.apache.comet.{CometConf, ConfigEntry}
 import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, 
IcebergReflection}
 import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
 import org.apache.comet.serde.ExprOuterClass.Expr
@@ -757,6 +757,8 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
     var totalTasks = 0
 
     commonBuilder.setMetadataLocation(metadata.metadataLocation)
+    commonBuilder.setDataFileConcurrencyLimit(
+      CometConf.COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT.get())
     metadata.catalogProperties.foreach { case (key, value) =>
       commonBuilder.putCatalogProperties(key, value)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to