andygrove commented on code in PR #3349:
URL: https://github.com/apache/datafusion-comet/pull/3349#discussion_r2747943817


##########
spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala:
##########
@@ -671,318 +671,293 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
   }
 
   /**
-   * Serializes a CometBatchScanExec wrapping an Iceberg SparkBatchQueryScan 
to protobuf.
+   * Converts a CometBatchScanExec to a minimal placeholder IcebergScan 
operator.
    *
-   * Uses pre-extracted metadata from CometScanRule to avoid redundant 
reflection operations. All
-   * reflection and validation was done during planning, so serialization 
failures here would
-   * indicate a programming error rather than an expected fallback condition.
+   * Returns a placeholder operator with only metadata_location for matching 
during partition
+   * injection. All other fields (catalog properties, required schema, pools, 
partition data) are
+   * set by serializePartitions() at execution time after DPP resolves.
    */
   override def convert(
       scan: CometBatchScanExec,
       builder: Operator.Builder,
       childOp: Operator*): Option[OperatorOuterClass.Operator] = {
+
+    val metadata = scan.nativeIcebergScanMetadata.getOrElse {
+      throw new IllegalStateException(
+        "Programming error: CometBatchScanExec.nativeIcebergScanMetadata is 
None. " +
+          "Metadata should have been extracted in CometScanRule.")
+    }
+
     val icebergScanBuilder = OperatorOuterClass.IcebergScan.newBuilder()
+    val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder()
+
+    // Only set metadata_location - used for matching in 
IcebergPartitionInjector.
+    // All other fields (catalog_properties, required_schema, pools) are set by
+    // serializePartitions() at execution time, so setting them here would be 
wasted work.
+    commonBuilder.setMetadataLocation(metadata.metadataLocation)
+
+    icebergScanBuilder.setCommon(commonBuilder.build())
+    // partition field intentionally empty - will be populated at execution 
time
+
+    builder.clearChildren()
+    Some(builder.setIcebergScan(icebergScanBuilder).build())
+  }
+
+  /**
+   * Serializes partitions from inputRDD at execution time.
+   *
+   * Called after doPrepare() has resolved DPP subqueries. Builds pools and 
per-partition data in
+   * one pass from the DPP-filtered partitions.
+   *
+   * @param wrappedScan
+   *   The BatchScanExec whose inputRDD contains the DPP-filtered partitions
+   * @param output
+   *   The output attributes for the scan
+   * @param metadata
+   *   Pre-extracted Iceberg metadata from CometScanRule
+   * @return
+   *   Tuple of (commonBytes, perPartitionBytes) for native execution
+   */
+  def serializePartitions(
+      wrappedScan: org.apache.spark.sql.execution.datasources.v2.BatchScanExec,
+      output: Seq[Attribute],
+      metadata: org.apache.comet.iceberg.CometIcebergNativeScanMetadata)
+      : (Array[Byte], Array[Array[Byte]]) = {
+
+    val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder()
 
     // Deduplication structures - map unique values to pool indices
     val schemaToPoolIndex = mutable.HashMap[AnyRef, Int]()
     val partitionTypeToPoolIndex = mutable.HashMap[String, Int]()
     val partitionSpecToPoolIndex = mutable.HashMap[String, Int]()
     val nameMappingToPoolIndex = mutable.HashMap[String, Int]()
     val projectFieldIdsToPoolIndex = mutable.HashMap[Seq[Int], Int]()
-    val partitionDataToPoolIndex = mutable.HashMap[String, Int]() // Base64 
bytes -> pool index
+    val partitionDataToPoolIndex = mutable.HashMap[String, Int]()
     val deleteFilesToPoolIndex =
       mutable.HashMap[Seq[OperatorOuterClass.IcebergDeleteFile], Int]()
     val residualToPoolIndex = mutable.HashMap[Option[Expr], Int]()
 
-    var totalTasks = 0
+    val perPartitionBuilders = 
mutable.ArrayBuffer[OperatorOuterClass.IcebergFilePartition]()
 
-    // Get pre-extracted metadata from planning phase
-    // If metadata is None, this is a programming error - metadata should have 
been extracted
-    // in CometScanRule before creating CometBatchScanExec
-    val metadata = scan.nativeIcebergScanMetadata.getOrElse {
-      throw new IllegalStateException(
-        "Programming error: CometBatchScanExec.nativeIcebergScanMetadata is 
None. " +
-          "Metadata should have been extracted in CometScanRule.")
-    }
-
-    // Use pre-extracted metadata (no reflection needed)
-    icebergScanBuilder.setMetadataLocation(metadata.metadataLocation)
+    var totalTasks = 0
 
+    // Set metadata location and catalog properties
+    commonBuilder.setMetadataLocation(metadata.metadataLocation)
     metadata.catalogProperties.foreach { case (key, value) =>
-      icebergScanBuilder.putCatalogProperties(key, value)
+      commonBuilder.putCatalogProperties(key, value)
     }
 
     // Set required_schema from output
-    scan.output.foreach { attr =>
+    output.foreach { attr =>
       val field = SparkStructField
         .newBuilder()
         .setName(attr.name)
         .setNullable(attr.nullable)
       serializeDataType(attr.dataType).foreach(field.setDataType)
-      icebergScanBuilder.addRequiredSchema(field.build())
+      commonBuilder.addRequiredSchema(field.build())
     }
 
-    // Extract FileScanTasks from the InputPartitions in the RDD
-    try {
-      scan.wrapped.inputRDD match {
-        case rdd: org.apache.spark.sql.execution.datasources.v2.DataSourceRDD 
=>
-          val partitions = rdd.partitions
-          partitions.foreach { partition =>
-            val partitionBuilder = 
OperatorOuterClass.IcebergFilePartition.newBuilder()
+    // Load Iceberg classes once (avoid repeated class loading in loop)

Review Comment:
   is this pulling in some of the changes from 
https://github.com/apache/datafusion-comet/pull/3298?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to