liujiayi771 commented on code in PR #11153:
URL:
https://github.com/apache/incubator-gluten/pull/11153#discussion_r2563511315
##########
gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala:
##########
@@ -64,7 +64,13 @@ case class MicroBatchScanExecTransformer(
@transient override lazy val inputPartitionsShim: Seq[InputPartition] =
stream.planInputPartitions(start, end)
- override def filterExprs(): Seq[Expression] = Seq.empty
+ override def scanFilters: Seq[Expression] = Seq.empty
+
+ def pushDownFilters: Option[Seq[Expression]] = Some(Seq.empty)
+
+ def withNewPushdownFilters(filters: Seq[Expression]):
BatchScanExecTransformerBase = {
+ throw new UnsupportedOperationException()
Review Comment:
Pls add some exception msg.
##########
gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala:
##########
@@ -198,6 +188,15 @@ abstract class BatchScanExecTransformerBase(
@transient override lazy val fileFormat: ReadFileFormat =
BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan)
+ override def equals(other: Any): Boolean = other match {
+ case other: BatchScanExecTransformerBase =>
+ this.pushDownFilters == other.pushDownFilters && super.equals(other)
+ case _ =>
+ false
+ }
+
+ override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters,
pushDownFilters)
+
Review Comment:
What is the purpose of this here?
##########
gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala:
##########
@@ -35,7 +35,8 @@ class GlutenOrcV2SchemaPruningSuite extends
OrcV2SchemaPruningSuite with GlutenS
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
case BatchScanExec(_, scan: OrcScan, _, _, _, _, _, _, _) =>
scan.readDataSchema
- case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _)
=> scan.readDataSchema
+ case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _,
_) =>
Review Comment:
ditto
##########
gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala:
##########
@@ -35,7 +35,8 @@ class GlutenOrcV2SchemaPruningSuite extends
OrcV2SchemaPruningSuite with GlutenS
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
case BatchScanExec(_, scan: OrcScan, _, _, _, _) => scan.readDataSchema
- case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _)
=> scan.readDataSchema
+ case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _,
_) =>
Review Comment:
ditto
##########
gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala:
##########
@@ -35,7 +35,8 @@ class GlutenOrcV2SchemaPruningSuite extends
OrcV2SchemaPruningSuite with GlutenS
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
case BatchScanExec(_, scan: OrcScan, _, _) => scan.readDataSchema
- case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _)
=> scan.readDataSchema
+ case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _,
_) =>
Review Comment:
ditto
##########
gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala:
##########
@@ -30,7 +30,8 @@ class GlutenOrcV2SchemaPruningSuite extends
OrcV2SchemaPruningSuite with GlutenS
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
case BatchScanExec(_, scan: OrcScan, _) => scan.readDataSchema
- case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _)
=> scan.readDataSchema
Review Comment:
I think using this approach is better, as it's less likely to cause
compatibility issues:
```scala
case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
```
##########
gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala:
##########
@@ -63,7 +63,12 @@ case class HiveTableScanExecTransformer(
hiveQlTable.getOutputFormatClass,
hiveQlTable.getMetadata)
- override def filterExprs(): Seq[Expression] = Seq.empty
+ override def scanFilters: Seq[Expression] = Seq.empty
+
+ override def pushDownFilters: Option[Seq[Expression]] = Some(Seq.empty)
+
+ override def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer =
+ throw new UnsupportedOperationException()
Review Comment:
ditto
##########
gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala:
##########
@@ -35,7 +35,8 @@ class GlutenOrcV2SchemaPruningSuite extends
OrcV2SchemaPruningSuite with GlutenS
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
case BatchScanExec(_, scan: OrcScan, _, _, _, _) => scan.readDataSchema
- case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _)
=> scan.readDataSchema
+ case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _,
_) =>
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]