This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new a592889a [AURON #1431] Add EmptyNativeRDD extending NativeRDD (#1430)
a592889a is described below
commit a592889a4ce731617d20bd92cb8f9322c145fa3b
Author: guixiaowen <[email protected]>
AuthorDate: Tue Oct 14 14:04:56 2025 +0800
[AURON #1431] Add EmptyNativeRDD extending NativeRDD (#1430)
# Which issue does this PR close?
Closes #1431
# Rationale for this change
Add a `EmptyNativeRDD` extending `NativeRDD`.
The characteristics of EmptyNativeRDD are:
1.it extends NativeRDD;
2.rddPartitions is empty;
3.calling the compute method will throw an exception.
The purpose of EmptyNativeRDD is:
1. Flexible placeholder, simplifying DAG construction, reducing code
complexity, and keeping the DAG structure complete;
2. Reduces redundant implementations, achieving a clearer inheritance
structure.
# What changes are included in this PR?
Change the two scenarios to use `EmptyNativeRDD`:
1. When `NativeParquetScan` generates a `NativeRDD`, if `filePartitions` is
0, create an `EmptyNativeRDD`; if greater than 0, create a `NativeRDD`;
3. When `NativeOrcScan` generates a `NativeRDD`, if `filePartitions` is 0,
create an `EmptyNativeRDD`; if greater than 0, create a `NativeRDD`.
# Are there any user-facing changes?
No
# How was this patch tested?
Unit tests
---------
Co-authored-by: guihuawen <[email protected]>
---
.../spark/sql/auron/AuronEmptyNativeRddSuite.scala | 82 ++++++++++++++++
.../spark/sql/auron/EmptyNativeRddSuite.scala | 38 +++++++
.../org/apache/spark/sql/auron/NativeRDD.scala | 21 ++++
.../execution/auron/plan/NativeOrcScanBase.scala | 109 +++++++++++----------
.../auron/plan/NativeParquetScanBase.scala | 109 +++++++++++----------
5 files changed, 253 insertions(+), 106 deletions(-)
diff --git
a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronEmptyNativeRddSuite.scala
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronEmptyNativeRddSuite.scala
new file mode 100644
index 00000000..6de32bd7
--- /dev/null
+++
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronEmptyNativeRddSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron
+
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.auron.plan.{NativeOrcScanExec,
NativeParquetScanExec}
+
+class AuronEmptyNativeRddSuite
+ extends org.apache.spark.sql.QueryTest
+ with BaseAuronSQLSuite
+ with AuronSQLTestHelper {
+
+ test("test parquet table generate EmptyNativeRDD scenarios") {
+ withTable("t1") {
+ sql(
+ "create table t1 using parquet PARTITIONED BY (part) as select 1 as
c1, 2 as c2, 'test test' as part")
+ val emptySparkPlan = sql("select * from t1 where part = '123'")
+ val emptyExecutePlan = emptySparkPlan.queryExecution.executedPlan
+ .asInstanceOf[AdaptiveSparkPlanExec]
+ val emptyNativeParquetScanExec =
+
AuronConverters.convertSparkPlan(emptyExecutePlan.executedPlan).collectFirst {
+ case nativeParquetScanExec: NativeParquetScanExec =>
+ nativeParquetScanExec
+ }
+ val emptyNativeRDD = emptyNativeParquetScanExec.get.doExecuteNative()
+ assert(emptyNativeRDD.isInstanceOf[EmptyNativeRDD])
+
+ val sparkPlan = sql("select * from t1")
+ val executePlan =
sparkPlan.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+ val nativeParquetScanExec =
+
AuronConverters.convertSparkPlan(executePlan.executedPlan).collectFirst {
+ case nativeParquetScanExec: NativeParquetScanExec =>
+ nativeParquetScanExec
+ }
+ val nativeRDD = nativeParquetScanExec.get.doExecuteNative()
+ assert(nativeRDD.isInstanceOf[NativeRDD])
+ }
+ }
+
+ test("test orc table generate EmptyNativeRDD scenarios") {
+ withTable("t1") {
+ sql(
+ "create table t1 using orc PARTITIONED BY (part) as select 1 as c1, 2
as c2, 'test test' as part")
+ val emptySparkPlan =
+ sql("select * from t1 where part = '123'")
+ val emptyExecutePlan = emptySparkPlan.queryExecution.executedPlan
+ .asInstanceOf[AdaptiveSparkPlanExec]
+ val emptyNativeOrcScanExec =
+
AuronConverters.convertSparkPlan(emptyExecutePlan.executedPlan).collectFirst {
+ case nativeOrcScanExec: NativeOrcScanExec =>
+ nativeOrcScanExec
+ }
+ val emptyNativeRDD = emptyNativeOrcScanExec.get.doExecuteNative()
+ assert(emptyNativeRDD.isInstanceOf[EmptyNativeRDD])
+
+ val sparkPlan = sql("select * from t1")
+ val executePlan =
+
sparkPlan.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+ val nativeOrcScanExec =
+
AuronConverters.convertSparkPlan(executePlan.executedPlan).collectFirst {
+ case nativeOrcScanExec: NativeOrcScanExec =>
+ nativeOrcScanExec
+ }
+ val nativeRDD = nativeOrcScanExec.get.doExecuteNative()
+ assert(nativeRDD.isInstanceOf[NativeRDD])
+ }
+ }
+}
diff --git
a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/EmptyNativeRddSuite.scala
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/EmptyNativeRddSuite.scala
new file mode 100644
index 00000000..2ef2e371
--- /dev/null
+++
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/EmptyNativeRddSuite.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron
+
+class EmptyNativeRddSuite
+ extends org.apache.spark.sql.QueryTest
+ with BaseAuronSQLSuite
+ with AuronSQLTestHelper {
+
+ test("test empty native rdd") {
+ val sc = spark.sparkContext
+ val empty = new EmptyNativeRDD(sc)
+ assert(empty.count === 0)
+ assert(empty.collect().size === 0)
+
+ val thrown = intercept[UnsupportedOperationException] {
+ empty.reduce((row1, row2) => {
+ row1
+ })
+ }
+ assert(thrown.getMessage.contains("empty"))
+ }
+
+}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
index e0641b52..5da12a44 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
@@ -68,6 +68,27 @@ class NativeRDD(
}
}
+class EmptyNativeRDD(@transient private val rddSparkContext: SparkContext)
+ extends NativeRDD(
+ rddSparkContext = rddSparkContext,
+ metrics = MetricNode(Map.empty, Seq(), None),
+ rddPartitions = Array.empty,
+ rddPartitioner = None,
+ rddDependencies = Seq.empty,
+ rddShuffleReadFull = false,
+ nativePlan = (_, _) => null,
+ friendlyName = "EmptyNativeRDD")
+ with Logging
+ with Serializable {
+
+ override protected def getPartitions: Array[Partition] = Array.empty
+
+ override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
+ throw new UnsupportedOperationException("empty RDD")
+ }
+
+}
+
class NativePlanWrapper(var p: (Partition, TaskContext) => PhysicalPlanNode)
extends Serializable {
def plan(split: Partition, context: TaskContext): PhysicalPlanNode = {
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
index ea54d5d6..b1f5e8c4 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
@@ -22,8 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.Partition
import org.apache.spark.TaskContext
-import org.apache.spark.sql.auron.MetricNode
-import org.apache.spark.sql.auron.NativeRDD
+import org.apache.spark.sql.auron.{EmptyNativeRDD, MetricNode, NativeRDD}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.FilePartition
@@ -34,61 +33,65 @@ abstract class NativeOrcScanBase(basedFileScan:
FileSourceScanExec)
override def doExecuteNative(): NativeRDD = {
val partitions = inputFileScanRDD.filePartitions.toArray
- val nativeMetrics = MetricNode(
- metrics,
- Nil,
- Some({
- case ("bytes_scanned", v) =>
- val inputMetric = TaskContext.get.taskMetrics().inputMetrics
- inputMetric.incBytesRead(v)
- case ("output_rows", v) =>
- val inputMetric = TaskContext.get.taskMetrics().inputMetrics
- inputMetric.incRecordsRead(v)
- case _ =>
- }))
- val nativePruningPredicateFilters = this.nativePruningPredicateFilters
- val nativeFileSchema = this.nativeFileSchema
- val nativeFileGroups = this.nativeFileGroups
- val nativePartitionSchema = this.nativePartitionSchema
- val projection = schema.map(field =>
basedFileScan.relation.schema.fieldIndex(field.name))
- val broadcastedHadoopConf = this.broadcastedHadoopConf
- val numPartitions = partitions.length
+ if (partitions.length > 0) {
+ val nativeMetrics = MetricNode(
+ metrics,
+ Nil,
+ Some({
+ case ("bytes_scanned", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incBytesRead(v)
+ case ("output_rows", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incRecordsRead(v)
+ case _ =>
+ }))
+ val nativePruningPredicateFilters = this.nativePruningPredicateFilters
+ val nativeFileSchema = this.nativeFileSchema
+ val nativeFileGroups = this.nativeFileGroups
+ val nativePartitionSchema = this.nativePartitionSchema
+ val projection = schema.map(field =>
basedFileScan.relation.schema.fieldIndex(field.name))
+ val broadcastedHadoopConf = this.broadcastedHadoopConf
+ val numPartitions = partitions.length
- new NativeRDD(
- sparkContext,
- nativeMetrics,
- partitions.asInstanceOf[Array[Partition]],
- None,
- Nil,
- rddShuffleReadFull = true,
- (partition, _) => {
- val resourceId = s"NativeOrcScanExec:${UUID.randomUUID().toString}"
- putJniBridgeResource(resourceId, broadcastedHadoopConf)
+ new NativeRDD(
+ sparkContext,
+ nativeMetrics,
+ partitions.asInstanceOf[Array[Partition]],
+ None,
+ Nil,
+ rddShuffleReadFull = true,
+ (partition, _) => {
+ val resourceId = s"NativeOrcScanExec:${UUID.randomUUID().toString}"
+ putJniBridgeResource(resourceId, broadcastedHadoopConf)
- val nativeFileGroup =
nativeFileGroups(partition.asInstanceOf[FilePartition])
- val nativeFileScanExecConf = pb.FileScanExecConf
- .newBuilder()
- .setNumPartitions(numPartitions)
- .setPartitionIndex(partition.index)
- .setStatistics(pb.Statistics.getDefaultInstance)
- .setSchema(nativeFileSchema)
- .setFileGroup(nativeFileGroup)
- .addAllProjection(projection.map(Integer.valueOf).asJava)
- .setPartitionSchema(nativePartitionSchema)
- .build()
+ val nativeFileGroup =
nativeFileGroups(partition.asInstanceOf[FilePartition])
+ val nativeFileScanExecConf = pb.FileScanExecConf
+ .newBuilder()
+ .setNumPartitions(numPartitions)
+ .setPartitionIndex(partition.index)
+ .setStatistics(pb.Statistics.getDefaultInstance)
+ .setSchema(nativeFileSchema)
+ .setFileGroup(nativeFileGroup)
+ .addAllProjection(projection.map(Integer.valueOf).asJava)
+ .setPartitionSchema(nativePartitionSchema)
+ .build()
- val nativeOrcScanExecBuilder = pb.OrcScanExecNode
- .newBuilder()
- .setBaseConf(nativeFileScanExecConf)
- .setFsResourceId(resourceId)
- .addAllPruningPredicates(nativePruningPredicateFilters.asJava)
+ val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanExecConf)
+ .setFsResourceId(resourceId)
+ .addAllPruningPredicates(nativePruningPredicateFilters.asJava)
- pb.PhysicalPlanNode
- .newBuilder()
- .setOrcScan(nativeOrcScanExecBuilder.build())
- .build()
- },
- friendlyName = "NativeRDD.OrcScan")
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setOrcScan(nativeOrcScanExecBuilder.build())
+ .build()
+ },
+ friendlyName = "NativeRDD.OrcScan")
+ } else {
+ new EmptyNativeRDD(sparkContext)
+ }
}
override val nodeName: String =
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
index 7ee9d1eb..f4129d0e 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
@@ -22,8 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.Partition
import org.apache.spark.TaskContext
-import org.apache.spark.sql.auron.MetricNode
-import org.apache.spark.sql.auron.NativeRDD
+import org.apache.spark.sql.auron.{EmptyNativeRDD, MetricNode, NativeRDD}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.FilePartition
@@ -34,62 +33,66 @@ abstract class NativeParquetScanBase(basedFileScan:
FileSourceScanExec)
override def doExecuteNative(): NativeRDD = {
val partitions = inputFileScanRDD.filePartitions.toArray
- val nativeMetrics = MetricNode(
- metrics,
- Nil,
- Some({
- case ("bytes_scanned", v) =>
- val inputMetric = TaskContext.get.taskMetrics().inputMetrics
- inputMetric.incBytesRead(v)
- case ("output_rows", v) =>
- val inputMetric = TaskContext.get.taskMetrics().inputMetrics
- inputMetric.incRecordsRead(v)
- case _ =>
- }))
- val nativePruningPredicateFilters = this.nativePruningPredicateFilters
- val nativeFileSchema = this.nativeFileSchema
- val nativeFileGroups = this.nativeFileGroups
- val nativePartitionSchema = this.nativePartitionSchema
+ if (partitions.length > 0) {
+ val nativeMetrics = MetricNode(
+ metrics,
+ Nil,
+ Some({
+ case ("bytes_scanned", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incBytesRead(v)
+ case ("output_rows", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incRecordsRead(v)
+ case _ =>
+ }))
+ val nativePruningPredicateFilters = this.nativePruningPredicateFilters
+ val nativeFileSchema = this.nativeFileSchema
+ val nativeFileGroups = this.nativeFileGroups
+ val nativePartitionSchema = this.nativePartitionSchema
- val projection = schema.map(field =>
basedFileScan.relation.schema.fieldIndex(field.name))
- val broadcastedHadoopConf = this.broadcastedHadoopConf
- val numPartitions = partitions.length
+ val projection = schema.map(field =>
basedFileScan.relation.schema.fieldIndex(field.name))
+ val broadcastedHadoopConf = this.broadcastedHadoopConf
+ val numPartitions = partitions.length
- new NativeRDD(
- sparkContext,
- nativeMetrics,
- partitions.asInstanceOf[Array[Partition]],
- None,
- Nil,
- rddShuffleReadFull = true,
- (partition, _context) => {
- val resourceId = s"NativeParquetScanExec:${UUID.randomUUID().toString}"
- putJniBridgeResource(resourceId, broadcastedHadoopConf)
+ new NativeRDD(
+ sparkContext,
+ nativeMetrics,
+ partitions.asInstanceOf[Array[Partition]],
+ None,
+ Nil,
+ rddShuffleReadFull = true,
+ (partition, _context) => {
+ val resourceId =
s"NativeParquetScanExec:${UUID.randomUUID().toString}"
+ putJniBridgeResource(resourceId, broadcastedHadoopConf)
- val nativeFileGroup =
nativeFileGroups(partition.asInstanceOf[FilePartition])
- val nativeParquetScanConf = pb.FileScanExecConf
- .newBuilder()
- .setNumPartitions(numPartitions)
- .setPartitionIndex(partition.index)
- .setStatistics(pb.Statistics.getDefaultInstance)
- .setSchema(nativeFileSchema)
- .setFileGroup(nativeFileGroup)
- .addAllProjection(projection.map(Integer.valueOf).asJava)
- .setPartitionSchema(nativePartitionSchema)
- .build()
+ val nativeFileGroup =
nativeFileGroups(partition.asInstanceOf[FilePartition])
+ val nativeParquetScanConf = pb.FileScanExecConf
+ .newBuilder()
+ .setNumPartitions(numPartitions)
+ .setPartitionIndex(partition.index)
+ .setStatistics(pb.Statistics.getDefaultInstance)
+ .setSchema(nativeFileSchema)
+ .setFileGroup(nativeFileGroup)
+ .addAllProjection(projection.map(Integer.valueOf).asJava)
+ .setPartitionSchema(nativePartitionSchema)
+ .build()
- val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
- .newBuilder()
- .setBaseConf(nativeParquetScanConf)
- .setFsResourceId(resourceId)
- .addAllPruningPredicates(nativePruningPredicateFilters.asJava)
+ val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeParquetScanConf)
+ .setFsResourceId(resourceId)
+ .addAllPruningPredicates(nativePruningPredicateFilters.asJava)
- pb.PhysicalPlanNode
- .newBuilder()
- .setParquetScan(nativeParquetScanExecBuilder.build())
- .build()
- },
- friendlyName = "NativeRDD.ParquetScan")
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setParquetScan(nativeParquetScanExecBuilder.build())
+ .build()
+ },
+ friendlyName = "NativeRDD.ParquetScan")
+ } else {
+ new EmptyNativeRDD(sparkContext)
+ }
}
override val nodeName: String =