mbutrovich commented on code in PR #3349:
URL: https://github.com/apache/datafusion-comet/pull/3349#discussion_r2748021565
##########
spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala:
##########
@@ -671,318 +671,293 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
}
/**
- * Serializes a CometBatchScanExec wrapping an Iceberg SparkBatchQueryScan
to protobuf.
+ * Converts a CometBatchScanExec to a minimal placeholder IcebergScan
operator.
*
- * Uses pre-extracted metadata from CometScanRule to avoid redundant
reflection operations. All
- * reflection and validation was done during planning, so serialization
failures here would
- * indicate a programming error rather than an expected fallback condition.
+ * Returns a placeholder operator with only metadata_location for matching
during partition
+ * injection. All other fields (catalog properties, required schema, pools,
partition data) are
+ * set by serializePartitions() at execution time after DPP resolves.
*/
override def convert(
scan: CometBatchScanExec,
builder: Operator.Builder,
childOp: Operator*): Option[OperatorOuterClass.Operator] = {
+
+ val metadata = scan.nativeIcebergScanMetadata.getOrElse {
+ throw new IllegalStateException(
+ "Programming error: CometBatchScanExec.nativeIcebergScanMetadata is
None. " +
+ "Metadata should have been extracted in CometScanRule.")
+ }
+
val icebergScanBuilder = OperatorOuterClass.IcebergScan.newBuilder()
+ val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder()
+
+ // Only set metadata_location - used for matching in
IcebergPartitionInjector.
+ // All other fields (catalog_properties, required_schema, pools) are set by
+ // serializePartitions() at execution time, so setting them here would be
wasted work.
+ commonBuilder.setMetadataLocation(metadata.metadataLocation)
+
+ icebergScanBuilder.setCommon(commonBuilder.build())
+ // partition field intentionally empty - will be populated at execution
time
+
+ builder.clearChildren()
+ Some(builder.setIcebergScan(icebergScanBuilder).build())
+ }
+
+ /**
+ * Serializes partitions from inputRDD at execution time.
+ *
+ * Called after doPrepare() has resolved DPP subqueries. Builds pools and
per-partition data in
+ * one pass from the DPP-filtered partitions.
+ *
+ * @param wrappedScan
+ * The BatchScanExec whose inputRDD contains the DPP-filtered partitions
+ * @param output
+ * The output attributes for the scan
+ * @param metadata
+ * Pre-extracted Iceberg metadata from CometScanRule
+ * @return
+ * Tuple of (commonBytes, perPartitionBytes) for native execution
+ */
+ def serializePartitions(
+ wrappedScan: org.apache.spark.sql.execution.datasources.v2.BatchScanExec,
+ output: Seq[Attribute],
+ metadata: org.apache.comet.iceberg.CometIcebergNativeScanMetadata)
+ : (Array[Byte], Array[Array[Byte]]) = {
+
+ val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder()
// Deduplication structures - map unique values to pool indices
val schemaToPoolIndex = mutable.HashMap[AnyRef, Int]()
val partitionTypeToPoolIndex = mutable.HashMap[String, Int]()
val partitionSpecToPoolIndex = mutable.HashMap[String, Int]()
val nameMappingToPoolIndex = mutable.HashMap[String, Int]()
val projectFieldIdsToPoolIndex = mutable.HashMap[Seq[Int], Int]()
- val partitionDataToPoolIndex = mutable.HashMap[String, Int]() // Base64
bytes -> pool index
+ val partitionDataToPoolIndex = mutable.HashMap[String, Int]()
val deleteFilesToPoolIndex =
mutable.HashMap[Seq[OperatorOuterClass.IcebergDeleteFile], Int]()
val residualToPoolIndex = mutable.HashMap[Option[Expr], Int]()
- var totalTasks = 0
+ val perPartitionBuilders =
mutable.ArrayBuffer[OperatorOuterClass.IcebergFilePartition]()
- // Get pre-extracted metadata from planning phase
- // If metadata is None, this is a programming error - metadata should have
been extracted
- // in CometScanRule before creating CometBatchScanExec
- val metadata = scan.nativeIcebergScanMetadata.getOrElse {
- throw new IllegalStateException(
- "Programming error: CometBatchScanExec.nativeIcebergScanMetadata is
None. " +
- "Metadata should have been extracted in CometScanRule.")
- }
-
- // Use pre-extracted metadata (no reflection needed)
- icebergScanBuilder.setMetadataLocation(metadata.metadataLocation)
+ var totalTasks = 0
+ // Set metadata location and catalog properties
+ commonBuilder.setMetadataLocation(metadata.metadataLocation)
metadata.catalogProperties.foreach { case (key, value) =>
- icebergScanBuilder.putCatalogProperties(key, value)
+ commonBuilder.putCatalogProperties(key, value)
}
// Set required_schema from output
- scan.output.foreach { attr =>
+ output.foreach { attr =>
val field = SparkStructField
.newBuilder()
.setName(attr.name)
.setNullable(attr.nullable)
serializeDataType(attr.dataType).foreach(field.setDataType)
- icebergScanBuilder.addRequiredSchema(field.build())
+ commonBuilder.addRequiredSchema(field.build())
}
- // Extract FileScanTasks from the InputPartitions in the RDD
- try {
- scan.wrapped.inputRDD match {
- case rdd: org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
=>
- val partitions = rdd.partitions
- partitions.foreach { partition =>
- val partitionBuilder =
OperatorOuterClass.IcebergFilePartition.newBuilder()
+ // Load Iceberg classes once (avoid repeated class loading in loop)
Review Comment:
Not fully, just tried to follow some best practices to hoist some calls out
of the loop. I didn't create a dedicated class or anything yet. I suspect we'll
need to adapt #3298 to this assuming this merges.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]