This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 5e9a1ca07 perf: reduce nativeIcebergScanMetadata serialization points
(#3243)
5e9a1ca07 is described below
commit 5e9a1ca070ed02b83f7edf89157e16a70f42c00d
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Jan 22 16:50:15 2026 -0500
perf: reduce nativeIcebergScanMetadata serialization points (#3243)
---
.../org/apache/spark/sql/comet/CometBatchScanExec.scala | 15 ++++++---------
.../spark/sql/comet/CometIcebergNativeScanExec.scala | 6 ++----
2 files changed, 8 insertions(+), 13 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
index e4ccb4c63..5bb0bd5a8 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
@@ -41,7 +41,7 @@ import org.apache.comet.iceberg.CometIcebergNativeScanMetadata
case class CometBatchScanExec(
wrapped: BatchScanExec,
runtimeFilters: Seq[Expression],
- nativeIcebergScanMetadata: Option[CometIcebergNativeScanMetadata] = None)
+ @transient nativeIcebergScanMetadata:
Option[CometIcebergNativeScanMetadata] = None)
extends DataSourceV2ScanExecBase
with CometPlan {
def ordering: Option[Seq[SortOrder]] = wrapped.ordering
@@ -99,18 +99,14 @@ case class CometBatchScanExec(
override def equals(other: Any): Boolean = other match {
case other: CometBatchScanExec =>
// `wrapped` in `this` and `other` could reference to the same
`BatchScanExec` object,
- // check `runtimeFilters` and `nativeIcebergScanMetadata` equality too.
- this.wrappedScan == other.wrappedScan && this.runtimeFilters ==
other.runtimeFilters &&
- this.nativeIcebergScanMetadata == other.nativeIcebergScanMetadata
+ // check `runtimeFilters` equality too.
+ this.wrappedScan == other.wrappedScan && this.runtimeFilters ==
other.runtimeFilters
case _ =>
false
}
override def hashCode(): Int = {
- Objects.hashCode(
- wrappedScan,
- runtimeFilters,
-
Integer.valueOf(nativeIcebergScanMetadata.map(_.hashCode()).getOrElse(0)))
+ Objects.hashCode(wrappedScan, runtimeFilters)
}
override def doCanonicalize(): CometBatchScanExec = {
@@ -118,7 +114,8 @@ case class CometBatchScanExec(
wrapped = wrappedScan.doCanonicalize(),
runtimeFilters = QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
- output))
+ output),
+ nativeIcebergScanMetadata = None)
}
override def nodeName: String = {
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala
index 89b23cb71..223ae4fbb 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala
@@ -166,8 +166,7 @@ case class CometIcebergNativeScanExec(
this.metadataLocation == other.metadataLocation &&
this.output == other.output &&
this.serializedPlanOpt == other.serializedPlanOpt &&
- this.numPartitions == other.numPartitions &&
- this.nativeIcebergScanMetadata == other.nativeIcebergScanMetadata
+ this.numPartitions == other.numPartitions
case _ =>
false
}
@@ -178,8 +177,7 @@ case class CometIcebergNativeScanExec(
metadataLocation,
output.asJava,
serializedPlanOpt,
- numPartitions: java.lang.Integer,
- nativeIcebergScanMetadata)
+ numPartitions: java.lang.Integer)
}
object CometIcebergNativeScanExec {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]