Zouxxyy commented on code in PR #10900:
URL:
https://github.com/apache/incubator-gluten/pull/10900#discussion_r2435929668
##########
backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala:
##########
@@ -18,48 +18,56 @@ package org.apache.gluten.extension
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{VeloxIcebergAppendDataExec,
VeloxIcebergOverwriteByExpressionExec,
VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec}
+import org.apache.gluten.extension.OffloadIcebergWrite._
import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
import org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector
+import org.apache.spark.sql.connector.write.Write
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec}
+import org.apache.spark.util.SparkReflectionUtil
case class OffloadIcebergAppend() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
- case a: AppendDataExec =>
+ case a: AppendDataExec if supportsWrite(a.write) =>
VeloxIcebergAppendDataExec(a)
case other => other
}
}
case class OffloadIcebergReplaceData() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
- case r: ReplaceDataExec =>
+ case r: ReplaceDataExec if supportsWrite(r.write) =>
VeloxIcebergReplaceDataExec(r)
case other => other
}
}
case class OffloadIcebergOverwrite() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
- case r: OverwriteByExpressionExec =>
+ case r: OverwriteByExpressionExec if supportsWrite(r.write) =>
VeloxIcebergOverwriteByExpressionExec(r)
case other => other
}
}
case class OffloadIcebergOverwritePartitionsDynamic() extends
OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
- case r: OverwritePartitionsDynamicExec =>
+ case r: OverwritePartitionsDynamicExec if supportsWrite(r.write) =>
VeloxIcebergOverwritePartitionsDynamicExec(r)
case other => other
}
}
object OffloadIcebergWrite {
+
+ def supportsWrite(write: Write): Boolean = {
Review Comment:
I added reflection here to ensure that this rule can still execute correctly
when there's no Iceberg dependency. A similar approach is used in
offloadBatchScan.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]