This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new a592889a [AURON #1431] Add EmptyNativeRDD extending NativeRDD (#1430)
a592889a is described below

commit a592889a4ce731617d20bd92cb8f9322c145fa3b
Author: guixiaowen <[email protected]>
AuthorDate: Tue Oct 14 14:04:56 2025 +0800

    [AURON #1431] Add EmptyNativeRDD extending NativeRDD (#1430)
    
    # Which issue does this PR close?
    
    Closes #1431
    
    # Rationale for this change
    
    Add a `EmptyNativeRDD` extending `NativeRDD`.
    
    The characteristics of EmptyNativeRDD are:
    
    1.it extends NativeRDD;
    2.rddPartitions is empty;
    3.calling the compute method will throw an exception.
    
    The purpose of EmptyNativeRDD is:
    
    1. Flexible placeholder, simplifying DAG construction, reducing code 
complexity, and keeping the DAG structure complete;
    2. Reduces redundant implementations, achieving a clearer inheritance 
structure.
    
    # What changes are included in this PR?
    
    Change the two scenarios to use `EmptyNativeRDD`:
    1. When `NativeParquetScan` generates a `NativeRDD`, if `filePartitions` is 
0, create an `EmptyNativeRDD`; if greater than 0, create a `NativeRDD`;
    3. When `NativeOrcScan` generates a `NativeRDD`, if `filePartitions` is 0, 
create an `EmptyNativeRDD`; if greater than 0, create a `NativeRDD`.
    
    # Are there any user-facing changes?
    
    No
    
    # How was this patch tested?
    
    Unit tests
    
    ---------
    
    Co-authored-by: guihuawen <[email protected]>
---
 .../spark/sql/auron/AuronEmptyNativeRddSuite.scala |  82 ++++++++++++++++
 .../spark/sql/auron/EmptyNativeRddSuite.scala      |  38 +++++++
 .../org/apache/spark/sql/auron/NativeRDD.scala     |  21 ++++
 .../execution/auron/plan/NativeOrcScanBase.scala   | 109 +++++++++++----------
 .../auron/plan/NativeParquetScanBase.scala         | 109 +++++++++++----------
 5 files changed, 253 insertions(+), 106 deletions(-)

diff --git 
a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronEmptyNativeRddSuite.scala
 
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronEmptyNativeRddSuite.scala
new file mode 100644
index 00000000..6de32bd7
--- /dev/null
+++ 
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronEmptyNativeRddSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron
+
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.auron.plan.{NativeOrcScanExec, 
NativeParquetScanExec}
+
+class AuronEmptyNativeRddSuite
+    extends org.apache.spark.sql.QueryTest
+    with BaseAuronSQLSuite
+    with AuronSQLTestHelper {
+
+  test("test parquet table generate EmptyNativeRDD scenarios") {
+    withTable("t1") {
+      sql(
+        "create table t1 using parquet PARTITIONED BY (part) as select 1 as 
c1, 2 as c2, 'test test' as part")
+      val emptySparkPlan = sql("select * from t1 where part = '123'")
+      val emptyExecutePlan = emptySparkPlan.queryExecution.executedPlan
+        .asInstanceOf[AdaptiveSparkPlanExec]
+      val emptyNativeParquetScanExec =
+        
AuronConverters.convertSparkPlan(emptyExecutePlan.executedPlan).collectFirst {
+          case nativeParquetScanExec: NativeParquetScanExec =>
+            nativeParquetScanExec
+        }
+      val emptyNativeRDD = emptyNativeParquetScanExec.get.doExecuteNative()
+      assert(emptyNativeRDD.isInstanceOf[EmptyNativeRDD])
+
+      val sparkPlan = sql("select * from t1")
+      val executePlan = 
sparkPlan.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+      val nativeParquetScanExec =
+        
AuronConverters.convertSparkPlan(executePlan.executedPlan).collectFirst {
+          case nativeParquetScanExec: NativeParquetScanExec =>
+            nativeParquetScanExec
+        }
+      val nativeRDD = nativeParquetScanExec.get.doExecuteNative()
+      assert(nativeRDD.isInstanceOf[NativeRDD])
+    }
+  }
+
+  test("test orc table generate EmptyNativeRDD scenarios") {
+    withTable("t1") {
+      sql(
+        "create table t1 using orc PARTITIONED BY (part) as select 1 as c1, 2 
as c2, 'test test' as part")
+      val emptySparkPlan =
+        sql("select * from t1 where part = '123'")
+      val emptyExecutePlan = emptySparkPlan.queryExecution.executedPlan
+        .asInstanceOf[AdaptiveSparkPlanExec]
+      val emptyNativeOrcScanExec =
+        
AuronConverters.convertSparkPlan(emptyExecutePlan.executedPlan).collectFirst {
+          case nativeOrcScanExec: NativeOrcScanExec =>
+            nativeOrcScanExec
+        }
+      val emptyNativeRDD = emptyNativeOrcScanExec.get.doExecuteNative()
+      assert(emptyNativeRDD.isInstanceOf[EmptyNativeRDD])
+
+      val sparkPlan = sql("select * from t1")
+      val executePlan =
+        
sparkPlan.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+      val nativeOrcScanExec =
+        
AuronConverters.convertSparkPlan(executePlan.executedPlan).collectFirst {
+          case nativeOrcScanExec: NativeOrcScanExec =>
+            nativeOrcScanExec
+        }
+      val nativeRDD = nativeOrcScanExec.get.doExecuteNative()
+      assert(nativeRDD.isInstanceOf[NativeRDD])
+    }
+  }
+}
diff --git 
a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/EmptyNativeRddSuite.scala
 
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/EmptyNativeRddSuite.scala
new file mode 100644
index 00000000..2ef2e371
--- /dev/null
+++ 
b/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/EmptyNativeRddSuite.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron
+
+class EmptyNativeRddSuite
+    extends org.apache.spark.sql.QueryTest
+    with BaseAuronSQLSuite
+    with AuronSQLTestHelper {
+
+  test("test empty native rdd") {
+    val sc = spark.sparkContext
+    val empty = new EmptyNativeRDD(sc)
+    assert(empty.count === 0)
+    assert(empty.collect().size === 0)
+
+    val thrown = intercept[UnsupportedOperationException] {
+      empty.reduce((row1, row2) => {
+        row1
+      })
+    }
+    assert(thrown.getMessage.contains("empty"))
+  }
+
+}
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
index e0641b52..5da12a44 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
@@ -68,6 +68,27 @@ class NativeRDD(
   }
 }
 
+class EmptyNativeRDD(@transient private val rddSparkContext: SparkContext)
+    extends NativeRDD(
+      rddSparkContext = rddSparkContext,
+      metrics = MetricNode(Map.empty, Seq(), None),
+      rddPartitions = Array.empty,
+      rddPartitioner = None,
+      rddDependencies = Seq.empty,
+      rddShuffleReadFull = false,
+      nativePlan = (_, _) => null,
+      friendlyName = "EmptyNativeRDD")
+    with Logging
+    with Serializable {
+
+  override protected def getPartitions: Array[Partition] = Array.empty
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    throw new UnsupportedOperationException("empty RDD")
+  }
+
+}
+
 class NativePlanWrapper(var p: (Partition, TaskContext) => PhysicalPlanNode)
     extends Serializable {
   def plan(split: Partition, context: TaskContext): PhysicalPlanNode = {
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
index ea54d5d6..b1f5e8c4 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
@@ -22,8 +22,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.Partition
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.auron.MetricNode
-import org.apache.spark.sql.auron.NativeRDD
+import org.apache.spark.sql.auron.{EmptyNativeRDD, MetricNode, NativeRDD}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.FilePartition
 
@@ -34,61 +33,65 @@ abstract class NativeOrcScanBase(basedFileScan: 
FileSourceScanExec)
 
   override def doExecuteNative(): NativeRDD = {
     val partitions = inputFileScanRDD.filePartitions.toArray
-    val nativeMetrics = MetricNode(
-      metrics,
-      Nil,
-      Some({
-        case ("bytes_scanned", v) =>
-          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
-          inputMetric.incBytesRead(v)
-        case ("output_rows", v) =>
-          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
-          inputMetric.incRecordsRead(v)
-        case _ =>
-      }))
-    val nativePruningPredicateFilters = this.nativePruningPredicateFilters
-    val nativeFileSchema = this.nativeFileSchema
-    val nativeFileGroups = this.nativeFileGroups
-    val nativePartitionSchema = this.nativePartitionSchema
-    val projection = schema.map(field => 
basedFileScan.relation.schema.fieldIndex(field.name))
-    val broadcastedHadoopConf = this.broadcastedHadoopConf
-    val numPartitions = partitions.length
+    if (partitions.length > 0) {
+      val nativeMetrics = MetricNode(
+        metrics,
+        Nil,
+        Some({
+          case ("bytes_scanned", v) =>
+            val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+            inputMetric.incBytesRead(v)
+          case ("output_rows", v) =>
+            val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+            inputMetric.incRecordsRead(v)
+          case _ =>
+        }))
+      val nativePruningPredicateFilters = this.nativePruningPredicateFilters
+      val nativeFileSchema = this.nativeFileSchema
+      val nativeFileGroups = this.nativeFileGroups
+      val nativePartitionSchema = this.nativePartitionSchema
+      val projection = schema.map(field => 
basedFileScan.relation.schema.fieldIndex(field.name))
+      val broadcastedHadoopConf = this.broadcastedHadoopConf
+      val numPartitions = partitions.length
 
-    new NativeRDD(
-      sparkContext,
-      nativeMetrics,
-      partitions.asInstanceOf[Array[Partition]],
-      None,
-      Nil,
-      rddShuffleReadFull = true,
-      (partition, _) => {
-        val resourceId = s"NativeOrcScanExec:${UUID.randomUUID().toString}"
-        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+      new NativeRDD(
+        sparkContext,
+        nativeMetrics,
+        partitions.asInstanceOf[Array[Partition]],
+        None,
+        Nil,
+        rddShuffleReadFull = true,
+        (partition, _) => {
+          val resourceId = s"NativeOrcScanExec:${UUID.randomUUID().toString}"
+          putJniBridgeResource(resourceId, broadcastedHadoopConf)
 
-        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
-        val nativeFileScanExecConf = pb.FileScanExecConf
-          .newBuilder()
-          .setNumPartitions(numPartitions)
-          .setPartitionIndex(partition.index)
-          .setStatistics(pb.Statistics.getDefaultInstance)
-          .setSchema(nativeFileSchema)
-          .setFileGroup(nativeFileGroup)
-          .addAllProjection(projection.map(Integer.valueOf).asJava)
-          .setPartitionSchema(nativePartitionSchema)
-          .build()
+          val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+          val nativeFileScanExecConf = pb.FileScanExecConf
+            .newBuilder()
+            .setNumPartitions(numPartitions)
+            .setPartitionIndex(partition.index)
+            .setStatistics(pb.Statistics.getDefaultInstance)
+            .setSchema(nativeFileSchema)
+            .setFileGroup(nativeFileGroup)
+            .addAllProjection(projection.map(Integer.valueOf).asJava)
+            .setPartitionSchema(nativePartitionSchema)
+            .build()
 
-        val nativeOrcScanExecBuilder = pb.OrcScanExecNode
-          .newBuilder()
-          .setBaseConf(nativeFileScanExecConf)
-          .setFsResourceId(resourceId)
-          .addAllPruningPredicates(nativePruningPredicateFilters.asJava)
+          val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+            .newBuilder()
+            .setBaseConf(nativeFileScanExecConf)
+            .setFsResourceId(resourceId)
+            .addAllPruningPredicates(nativePruningPredicateFilters.asJava)
 
-        pb.PhysicalPlanNode
-          .newBuilder()
-          .setOrcScan(nativeOrcScanExecBuilder.build())
-          .build()
-      },
-      friendlyName = "NativeRDD.OrcScan")
+          pb.PhysicalPlanNode
+            .newBuilder()
+            .setOrcScan(nativeOrcScanExecBuilder.build())
+            .build()
+        },
+        friendlyName = "NativeRDD.OrcScan")
+    } else {
+      new EmptyNativeRDD(sparkContext)
+    }
   }
 
   override val nodeName: String =
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
index 7ee9d1eb..f4129d0e 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
@@ -22,8 +22,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.Partition
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.auron.MetricNode
-import org.apache.spark.sql.auron.NativeRDD
+import org.apache.spark.sql.auron.{EmptyNativeRDD, MetricNode, NativeRDD}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.FilePartition
 
@@ -34,62 +33,66 @@ abstract class NativeParquetScanBase(basedFileScan: 
FileSourceScanExec)
 
   override def doExecuteNative(): NativeRDD = {
     val partitions = inputFileScanRDD.filePartitions.toArray
-    val nativeMetrics = MetricNode(
-      metrics,
-      Nil,
-      Some({
-        case ("bytes_scanned", v) =>
-          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
-          inputMetric.incBytesRead(v)
-        case ("output_rows", v) =>
-          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
-          inputMetric.incRecordsRead(v)
-        case _ =>
-      }))
-    val nativePruningPredicateFilters = this.nativePruningPredicateFilters
-    val nativeFileSchema = this.nativeFileSchema
-    val nativeFileGroups = this.nativeFileGroups
-    val nativePartitionSchema = this.nativePartitionSchema
+    if (partitions.length > 0) {
+      val nativeMetrics = MetricNode(
+        metrics,
+        Nil,
+        Some({
+          case ("bytes_scanned", v) =>
+            val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+            inputMetric.incBytesRead(v)
+          case ("output_rows", v) =>
+            val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+            inputMetric.incRecordsRead(v)
+          case _ =>
+        }))
+      val nativePruningPredicateFilters = this.nativePruningPredicateFilters
+      val nativeFileSchema = this.nativeFileSchema
+      val nativeFileGroups = this.nativeFileGroups
+      val nativePartitionSchema = this.nativePartitionSchema
 
-    val projection = schema.map(field => 
basedFileScan.relation.schema.fieldIndex(field.name))
-    val broadcastedHadoopConf = this.broadcastedHadoopConf
-    val numPartitions = partitions.length
+      val projection = schema.map(field => 
basedFileScan.relation.schema.fieldIndex(field.name))
+      val broadcastedHadoopConf = this.broadcastedHadoopConf
+      val numPartitions = partitions.length
 
-    new NativeRDD(
-      sparkContext,
-      nativeMetrics,
-      partitions.asInstanceOf[Array[Partition]],
-      None,
-      Nil,
-      rddShuffleReadFull = true,
-      (partition, _context) => {
-        val resourceId = s"NativeParquetScanExec:${UUID.randomUUID().toString}"
-        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+      new NativeRDD(
+        sparkContext,
+        nativeMetrics,
+        partitions.asInstanceOf[Array[Partition]],
+        None,
+        Nil,
+        rddShuffleReadFull = true,
+        (partition, _context) => {
+          val resourceId = 
s"NativeParquetScanExec:${UUID.randomUUID().toString}"
+          putJniBridgeResource(resourceId, broadcastedHadoopConf)
 
-        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
-        val nativeParquetScanConf = pb.FileScanExecConf
-          .newBuilder()
-          .setNumPartitions(numPartitions)
-          .setPartitionIndex(partition.index)
-          .setStatistics(pb.Statistics.getDefaultInstance)
-          .setSchema(nativeFileSchema)
-          .setFileGroup(nativeFileGroup)
-          .addAllProjection(projection.map(Integer.valueOf).asJava)
-          .setPartitionSchema(nativePartitionSchema)
-          .build()
+          val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+          val nativeParquetScanConf = pb.FileScanExecConf
+            .newBuilder()
+            .setNumPartitions(numPartitions)
+            .setPartitionIndex(partition.index)
+            .setStatistics(pb.Statistics.getDefaultInstance)
+            .setSchema(nativeFileSchema)
+            .setFileGroup(nativeFileGroup)
+            .addAllProjection(projection.map(Integer.valueOf).asJava)
+            .setPartitionSchema(nativePartitionSchema)
+            .build()
 
-        val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
-          .newBuilder()
-          .setBaseConf(nativeParquetScanConf)
-          .setFsResourceId(resourceId)
-          .addAllPruningPredicates(nativePruningPredicateFilters.asJava)
+          val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+            .newBuilder()
+            .setBaseConf(nativeParquetScanConf)
+            .setFsResourceId(resourceId)
+            .addAllPruningPredicates(nativePruningPredicateFilters.asJava)
 
-        pb.PhysicalPlanNode
-          .newBuilder()
-          .setParquetScan(nativeParquetScanExecBuilder.build())
-          .build()
-      },
-      friendlyName = "NativeRDD.ParquetScan")
+          pb.PhysicalPlanNode
+            .newBuilder()
+            .setParquetScan(nativeParquetScanExecBuilder.build())
+            .build()
+        },
+        friendlyName = "NativeRDD.ParquetScan")
+    } else {
+      new EmptyNativeRDD(sparkContext)
+    }
   }
 
   override val nodeName: String =

Reply via email to