This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d3f67d59a8 [MINOR] Code cleanup: Remove deprecated files and fix typos
(#11370)
d3f67d59a8 is described below
commit d3f67d59a894bb2f91e1d0095d75fb835aaf6b15
Author: Chang chen <[email protected]>
AuthorDate: Wed Jan 7 21:37:05 2026 +0800
[MINOR] Code cleanup: Remove deprecated files and fix typos (#11370)
* Remove deprecated `BatchScanExec` shims for Spark versions 3.2 to 4.0.
* Rename `InternalRowUtl` to `InternalRowUtil` across shims for Spark
versions 3.2 to 4.0.
* Remove the SLF4J and Log4J version properties from the Spark UT modudle
and rely on the Spark profile instead.
* fix build due to rebase
---
.../gluten/columnarbatch/ColumnarBatches.java | 4 +-
gluten-ut/spark34/pom.xml | 4 -
gluten-ut/spark35/pom.xml | 4 -
gluten-ut/spark40/pom.xml | 4 -
...{InternalRowUtl.scala => InternalRowUtil.scala} | 2 +-
.../datasources/v2/BatchScanExec.scala.deprecated | 124 -----------
...{InternalRowUtl.scala => InternalRowUtil.scala} | 2 +-
.../datasources/v2/BatchScanExec.scala.deprecated | 142 ------------
...{InternalRowUtl.scala => InternalRowUtil.scala} | 2 +-
.../datasources/v2/BatchScanExec.scala.deprecated | 243 ---------------------
.../org/apache/gluten/utils/InternalRowUtil.scala} | 2 +-
.../datasources/v2/BatchScanExec.scala.deprecated | 243 ---------------------
.../org/apache/gluten/utils/InternalRowUtil.scala} | 2 +-
.../datasources/v2/BatchScanExec.scala.deprecated | 243 ---------------------
.../org/apache/gluten/utils/InternalRowUtil.scala} | 2 +-
.../datasources/v2/BatchScanExec.scala.deprecated | 243 ---------------------
16 files changed, 8 insertions(+), 1258 deletions(-)
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index 78cbc98ec4..40b931d92a 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -21,7 +21,7 @@ import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;
import org.apache.gluten.utils.ArrowAbiUtil;
import org.apache.gluten.utils.ArrowUtil;
-import org.apache.gluten.utils.InternalRowUtl;
+import org.apache.gluten.utils.InternalRowUtil;
import org.apache.gluten.vectorized.ArrowColumnarBatch;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;
@@ -412,7 +412,7 @@ public final class ColumnarBatches {
public static String toString(ColumnarBatch batch, int start, int length) {
ColumnarBatch loadedBatch =
ensureLoaded(ArrowBufferAllocators.contextInstance(), batch);
StructType type =
SparkArrowUtil.fromArrowSchema(ArrowUtil.toSchema(loadedBatch));
- return InternalRowUtl.toString(
+ return InternalRowUtil.toString(
type,
JavaConverters.<InternalRow>asScalaIterator(loadedBatch.rowIterator()),
start,
diff --git a/gluten-ut/spark34/pom.xml b/gluten-ut/spark34/pom.xml
index 6d34eb6384..32ca19e99f 100644
--- a/gluten-ut/spark34/pom.xml
+++ b/gluten-ut/spark34/pom.xml
@@ -130,10 +130,6 @@
<activation>
<activeByDefault>false</activeByDefault>
</activation>
- <properties>
- <slf4j.version>2.0.6</slf4j.version>
- <log4j.version>2.19.0</log4j.version>
- </properties>
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
diff --git a/gluten-ut/spark35/pom.xml b/gluten-ut/spark35/pom.xml
index 0fbd50d1ec..e82d8d9086 100644
--- a/gluten-ut/spark35/pom.xml
+++ b/gluten-ut/spark35/pom.xml
@@ -191,10 +191,6 @@
<activation>
<activeByDefault>false</activeByDefault>
</activation>
- <properties>
- <slf4j.version>2.0.6</slf4j.version>
- <log4j.version>2.19.0</log4j.version>
- </properties>
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
diff --git a/gluten-ut/spark40/pom.xml b/gluten-ut/spark40/pom.xml
index 9d5633274a..d169abb888 100644
--- a/gluten-ut/spark40/pom.xml
+++ b/gluten-ut/spark40/pom.xml
@@ -236,10 +236,6 @@
<activation>
<activeByDefault>false</activeByDefault>
</activation>
- <properties>
- <slf4j.version>2.0.16</slf4j.version>
- <log4j.version>2.24.3</log4j.version>
- </properties>
<dependencies>
<dependency>
<groupId>org.junit.jupiter</groupId>
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
b/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from
shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to
shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 32d6943713..7c9ffc70ca 100644
--- a/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType
-object InternalRowUtl {
+object InternalRowUtil {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = RowEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index 4d4dba7b44..0000000000
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources.v2
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
-import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory, Scan, SupportsRuntimeFiltering}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-
-import com.google.common.base.Objects
-
-// This file is copied from Spark with a little change to solve below issue:
-// The BatchScanExec can support columnar output, which is incompatible with
-// Arrow's columnar format. But there is no config to disable the columnar
output.
-// In this file, the supportsColumnar was set as false to prevent Spark's
columnar
-// output.
-
-/** Physical plan node for scanning a batch of data from a data source v2. */
-case class BatchScanExec(
- output: Seq[AttributeReference],
- @transient scan: Scan,
- runtimeFilters: Seq[Expression])
- extends DataSourceV2ScanExecBase {
-
- @transient lazy val batch = scan.toBatch
-
- // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
- override def equals(other: Any): Boolean = other match {
- case other: BatchScanExec =>
- this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
- case _ =>
- false
- }
-
- override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
- @transient override lazy val partitions: Seq[InputPartition] =
batch.planInputPartitions()
-
- @transient private lazy val filteredPartitions: Seq[InputPartition] = {
- val dataSourceFilters = runtimeFilters.flatMap {
- case DynamicPruningExpression(e) =>
DataSourceStrategy.translateRuntimeFilter(e)
- case _ => None
- }
-
- if (dataSourceFilters.nonEmpty) {
- val originalPartitioning = outputPartitioning
-
- // the cast is safe as runtime filters are only assigned if the scan can
be filtered
- val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]
- filterableScan.filter(dataSourceFilters.toArray)
-
- // call toBatch again to get filtered partitions
- val newPartitions = scan.toBatch.planInputPartitions()
-
- originalPartitioning match {
- case p: DataSourcePartitioning if p.numPartitions !=
newPartitions.size =>
- throw new SparkException(
- "Data source must have preserved the original partitioning during
runtime filtering; " +
- s"reported num partitions: ${p.numPartitions}, " +
- s"num partitions after runtime filtering: ${newPartitions.size}")
- case _ =>
- // no validation is needed as the data source did not report any
specific partitioning
- }
-
- newPartitions
- } else {
- partitions
- }
- }
-
- override lazy val readerFactory: PartitionReaderFactory =
batch.createReaderFactory()
-
- override lazy val inputRDD: RDD[InternalRow] = {
- if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
- // return an empty RDD with 1 partition if dynamic filtering removed the
only split
- sparkContext.parallelize(Array.empty[InternalRow], 1)
- } else {
- new DataSourceRDD(
- sparkContext,
- filteredPartitions,
- readerFactory,
- supportsColumnar,
- customMetrics)
- }
- }
-
- override def doCanonicalize(): BatchScanExec = {
- this.copy(
- output = output.map(QueryPlan.normalizeExpressions(_, output)),
- runtimeFilters = QueryPlan.normalizePredicates(
- runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
- output)
- )
- }
-
- override def simpleString(maxFields: Int): String = {
- val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
- val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
- val result = s"$nodeName$truncatedOutputString ${scan.description()}
$runtimeFiltersString"
- redact(result)
- }
-
- // Set to false to disable BatchScan's columnar output.
- override def supportsColumnar: Boolean = false
-}
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
b/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from
shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to
shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 32d6943713..7c9ffc70ca 100644
--- a/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType
-object InternalRowUtl {
+object InternalRowUtil {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = RowEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index aaae179971..0000000000
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources.v2
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning,
SinglePartition}
-import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowSet}
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-
-import java.util.Objects
-
-/** Physical plan node for scanning a batch of data from a data source v2. */
-case class BatchScanExec(
- output: Seq[AttributeReference],
- @transient scan: Scan,
- runtimeFilters: Seq[Expression],
- keyGroupedPartitioning: Option[Seq[Expression]] = None)
- extends DataSourceV2ScanExecBase {
-
- @transient lazy val batch = scan.toBatch
-
- // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
- override def equals(other: Any): Boolean = other match {
- case other: BatchScanExec =>
- this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
- case _ =>
- false
- }
-
- override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
- @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
-
- @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
- val dataSourceFilters = runtimeFilters.flatMap {
- case DynamicPruningExpression(e) =>
DataSourceStrategy.translateRuntimeFilter(e)
- case _ => None
- }
-
- if (dataSourceFilters.nonEmpty) {
- val originalPartitioning = outputPartitioning
-
- // the cast is safe as runtime filters are only assigned if the scan can
be filtered
- val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]
- filterableScan.filter(dataSourceFilters.toArray)
-
- // call toBatch again to get filtered partitions
- val newPartitions = scan.toBatch.planInputPartitions()
-
- originalPartitioning match {
- case p: KeyGroupedPartitioning =>
- if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
- throw new SparkException(
- "Data source must have preserved the original partitioning " +
- "during runtime filtering: not all partitions implement
HasPartitionKey after " +
- "filtering")
- }
-
- val newRows = new InternalRowSet(p.expressions.map(_.dataType))
- newRows ++=
newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey())
- val oldRows = p.partitionValuesOpt.get
-
- if (oldRows.size != newRows.size) {
- throw new SparkException(
- "Data source must have preserved the original partitioning " +
- "during runtime filtering: the number of unique partition
values obtained " +
- s"through HasPartitionKey changed: before ${oldRows.size},
after ${newRows.size}")
- }
-
- if (!oldRows.forall(newRows.contains)) {
- throw new SparkException(
- "Data source must have preserved the original partitioning " +
- "during runtime filtering: the number of unique partition
values obtained " +
- s"through HasPartitionKey remain the same but do not exactly
match")
- }
-
- groupPartitions(newPartitions).get.map(_._2)
-
- case _ =>
- // no validation is needed as the data source did not report any
specific partitioning
- newPartitions.map(Seq(_))
- }
-
- } else {
- partitions
- }
- }
-
- override lazy val readerFactory: PartitionReaderFactory =
batch.createReaderFactory()
-
- override lazy val inputRDD: RDD[InternalRow] = {
- if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
- // return an empty RDD with 1 partition if dynamic filtering removed the
only split
- sparkContext.parallelize(Array.empty[InternalRow], 1)
- } else {
- new DataSourceRDD(
- sparkContext,
- filteredPartitions,
- readerFactory,
- supportsColumnar,
- customMetrics)
- }
- }
-
- override def doCanonicalize(): BatchScanExec = {
- this.copy(
- output = output.map(QueryPlan.normalizeExpressions(_, output)),
- runtimeFilters = QueryPlan.normalizePredicates(
- runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
- output)
- )
- }
-
- override def simpleString(maxFields: Int): String = {
- val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
- val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
- val result = s"$nodeName$truncatedOutputString ${scan.description()}
$runtimeFiltersString"
- redact(result)
- }
-
- // Set to false to disable BatchScan's columnar output.
- override def supportsColumnar: Boolean = false
-}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
b/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from
shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to
shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 32d6943713..7c9ffc70ca 100644
--- a/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType
-object InternalRowUtl {
+object InternalRowUtil {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = RowEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index d43331d57c..0000000000
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import com.google.common.base.Objects
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning,
Partitioning, SinglePartition}
-import org.apache.spark.sql.catalyst.util.{truncatedString,
InternalRowComparableWrapper}
-import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Physical plan node for scanning a batch of data from a data source v2.
- */
-case class BatchScanExec(
- output: Seq[AttributeReference],
- @transient scan: Scan,
- runtimeFilters: Seq[Expression],
- keyGroupedPartitioning: Option[Seq[Expression]] = None,
- ordering: Option[Seq[SortOrder]] = None,
- @transient table: Table,
- commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
- applyPartialClustering: Boolean = false,
- replicatePartitions: Boolean = false) extends DataSourceV2ScanExecBase {
-
- @transient lazy val batch = if (scan == null) null else scan.toBatch
-
- // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
- override def equals(other: Any): Boolean = other match {
- case other: BatchScanExec =>
- this.batch != null && this.batch == other.batch &&
- this.runtimeFilters == other.runtimeFilters &&
- this.commonPartitionValues == other.commonPartitionValues &&
- this.replicatePartitions == other.replicatePartitions &&
- this.applyPartialClustering == other.applyPartialClustering
- case _ =>
- false
- }
-
- override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
- @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
-
- @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
- val dataSourceFilters = runtimeFilters.flatMap {
- case DynamicPruningExpression(e) =>
DataSourceV2Strategy.translateRuntimeFilterV2(e)
- case _ => None
- }
-
- if (dataSourceFilters.nonEmpty) {
- val originalPartitioning = outputPartitioning
-
- // the cast is safe as runtime filters are only assigned if the scan can
be filtered
- val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
- filterableScan.filter(dataSourceFilters.toArray)
-
- // call toBatch again to get filtered partitions
- val newPartitions = scan.toBatch.planInputPartitions()
-
- originalPartitioning match {
- case p: KeyGroupedPartitioning =>
- if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
- throw new SparkException("Data source must have preserved the
original partitioning " +
- "during runtime filtering: not all partitions implement
HasPartitionKey after " +
- "filtering")
- }
- val newPartitionValues = newPartitions.map(partition =>
-
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey],
p.expressions))
- .toSet
- val oldPartitionValues = p.partitionValues
- .map(partition => InternalRowComparableWrapper(partition,
p.expressions)).toSet
- // We require the new number of partition values to be equal or less
than the old number
- // of partition values here. In the case of less than, empty
partitions will be added for
- // those missing values that are not present in the new input
partitions.
- if (oldPartitionValues.size < newPartitionValues.size) {
- throw new SparkException("During runtime filtering, data source
must either report " +
- "the same number of partition values, or a subset of partition
values from the " +
- s"original. Before: ${oldPartitionValues.size} partition
values. " +
- s"After: ${newPartitionValues.size} partition values")
- }
-
- if (!newPartitionValues.forall(oldPartitionValues.contains)) {
- throw new SparkException("During runtime filtering, data source
must not report new " +
- "partition values that are not present in the original
partitioning.")
- }
-
- groupPartitions(newPartitions).get.map(_._2)
-
- case _ =>
- // no validation is needed as the data source did not report any
specific partitioning
- newPartitions.map(Seq(_))
- }
-
- } else {
- partitions
- }
- }
-
- override def outputPartitioning: Partitioning = {
- super.outputPartitioning match {
- case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
- // We allow duplicated partition values if
- //
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
- val newPartValues = commonPartitionValues.get.flatMap { case
(partValue, numSplits) =>
- Seq.fill(numSplits)(partValue)
- }
- k.copy(numPartitions = newPartValues.length, partitionValues =
newPartValues)
- case p => p
- }
- }
-
- override lazy val readerFactory: PartitionReaderFactory =
batch.createReaderFactory()
-
- override lazy val inputRDD: RDD[InternalRow] = {
- val rdd = if (filteredPartitions.isEmpty && outputPartitioning ==
SinglePartition) {
- // return an empty RDD with 1 partition if dynamic filtering removed the
only split
- sparkContext.parallelize(Array.empty[InternalRow], 1)
- } else {
- var finalPartitions = filteredPartitions
-
- outputPartitioning match {
- case p: KeyGroupedPartitioning =>
- if (conf.v2BucketingPushPartValuesEnabled &&
- conf.v2BucketingPartiallyClusteredDistributionEnabled) {
- assert(filteredPartitions.forall(_.size == 1),
- "Expect partitions to be not grouped when " +
-
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
- "is enabled")
-
- val groupedPartitions =
groupPartitions(finalPartitions.map(_.head), true).get
-
- // This means the input partitions are not grouped by partition
values. We'll need to
- // check `groupByPartitionValues` and decide whether to group and
replicate splits
- // within a partition.
- if (commonPartitionValues.isDefined && applyPartialClustering) {
- // A mapping from the common partition values to how many splits
the partition
- // should contain. Note this no longer maintain the partition
key ordering.
- val commonPartValuesMap = commonPartitionValues
- .get
- .map(t => (InternalRowComparableWrapper(t._1, p.expressions),
t._2))
- .toMap
- val nestGroupedPartitions = groupedPartitions.map {
- case (partValue, splits) =>
- // `commonPartValuesMap` should contain the part value since
it's the super set.
- val numSplits = commonPartValuesMap
- .get(InternalRowComparableWrapper(partValue,
p.expressions))
- assert(numSplits.isDefined, s"Partition value $partValue
does not exist in " +
- "common partition values from Spark plan")
-
- val newSplits = if (replicatePartitions) {
- // We need to also replicate partitions according to the
other side of join
- Seq.fill(numSplits.get)(splits)
- } else {
- // Not grouping by partition values: this could be the
side with partially
- // clustered distribution. Because of dynamic filtering,
we'll need to check if
- // the final number of splits of a partition is smaller
than the original
- // number, and fill with empty splits if so. This is
necessary so that both
- // sides of a join will have the same number of partitions
& splits.
- splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
- }
- (InternalRowComparableWrapper(partValue, p.expressions),
newSplits)
- }
-
- // Now fill missing partition keys with empty partitions
- val partitionMapping = nestGroupedPartitions.toMap
- finalPartitions = commonPartitionValues.get.flatMap { case
(partValue, numSplits) =>
- // Use empty partition for those partition values that are not
present.
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
- Seq.fill(numSplits)(Seq.empty))
- }
- } else {
- val partitionMapping = groupedPartitions.map { case (row, parts)
=>
- InternalRowComparableWrapper(row, p.expressions) -> parts
- }.toMap
- finalPartitions = p.partitionValues.map { partValue =>
- // Use empty partition for those partition values that are not
present
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
- }
- }
- } else {
- val partitionMapping = finalPartitions.map { parts =>
- val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
- InternalRowComparableWrapper(row, p.expressions) -> parts
- }.toMap
- finalPartitions = p.partitionValues.map { partValue =>
- // Use empty partition for those partition values that are not
present
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
- }
- }
-
- case _ =>
- }
-
- new DataSourceRDD(
- sparkContext, finalPartitions, readerFactory, supportsColumnar,
customMetrics)
- }
- postDriverMetrics()
- rdd
- }
-
- override def doCanonicalize(): BatchScanExec = {
- this.copy(
- output = output.map(QueryPlan.normalizeExpressions(_, output)),
- runtimeFilters = QueryPlan.normalizePredicates(
- runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
- output))
- }
-
- override def simpleString(maxFields: Int): String = {
- val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
- val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
- val result = s"$nodeName$truncatedOutputString ${scan.description()}
$runtimeFiltersString"
- redact(result)
- }
-
- override def nodeName: String = {
- s"BatchScan ${table.name()}".trim
- }
-}
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
b/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from
shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to
shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 654e43cbd0..0f7c13448f 100644
--- a/shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.StructType
-object InternalRowUtl {
+object InternalRowUtil {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = ExpressionEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index d43331d57c..0000000000
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import com.google.common.base.Objects
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning,
Partitioning, SinglePartition}
-import org.apache.spark.sql.catalyst.util.{truncatedString,
InternalRowComparableWrapper}
-import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Physical plan node for scanning a batch of data from a data source v2.
- */
-case class BatchScanExec(
- output: Seq[AttributeReference],
- @transient scan: Scan,
- runtimeFilters: Seq[Expression],
- keyGroupedPartitioning: Option[Seq[Expression]] = None,
- ordering: Option[Seq[SortOrder]] = None,
- @transient table: Table,
- commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
- applyPartialClustering: Boolean = false,
- replicatePartitions: Boolean = false) extends DataSourceV2ScanExecBase {
-
- @transient lazy val batch = if (scan == null) null else scan.toBatch
-
- // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
- override def equals(other: Any): Boolean = other match {
- case other: BatchScanExec =>
- this.batch != null && this.batch == other.batch &&
- this.runtimeFilters == other.runtimeFilters &&
- this.commonPartitionValues == other.commonPartitionValues &&
- this.replicatePartitions == other.replicatePartitions &&
- this.applyPartialClustering == other.applyPartialClustering
- case _ =>
- false
- }
-
- override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
- @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
-
- @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
- val dataSourceFilters = runtimeFilters.flatMap {
- case DynamicPruningExpression(e) =>
DataSourceV2Strategy.translateRuntimeFilterV2(e)
- case _ => None
- }
-
- if (dataSourceFilters.nonEmpty) {
- val originalPartitioning = outputPartitioning
-
- // the cast is safe as runtime filters are only assigned if the scan can
be filtered
- val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
- filterableScan.filter(dataSourceFilters.toArray)
-
- // call toBatch again to get filtered partitions
- val newPartitions = scan.toBatch.planInputPartitions()
-
- originalPartitioning match {
- case p: KeyGroupedPartitioning =>
- if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
- throw new SparkException("Data source must have preserved the
original partitioning " +
- "during runtime filtering: not all partitions implement
HasPartitionKey after " +
- "filtering")
- }
- val newPartitionValues = newPartitions.map(partition =>
-
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey],
p.expressions))
- .toSet
- val oldPartitionValues = p.partitionValues
- .map(partition => InternalRowComparableWrapper(partition,
p.expressions)).toSet
- // We require the new number of partition values to be equal or less
than the old number
- // of partition values here. In the case of less than, empty
partitions will be added for
- // those missing values that are not present in the new input
partitions.
- if (oldPartitionValues.size < newPartitionValues.size) {
- throw new SparkException("During runtime filtering, data source
must either report " +
- "the same number of partition values, or a subset of partition
values from the " +
- s"original. Before: ${oldPartitionValues.size} partition
values. " +
- s"After: ${newPartitionValues.size} partition values")
- }
-
- if (!newPartitionValues.forall(oldPartitionValues.contains)) {
- throw new SparkException("During runtime filtering, data source
must not report new " +
- "partition values that are not present in the original
partitioning.")
- }
-
- groupPartitions(newPartitions).get.map(_._2)
-
- case _ =>
- // no validation is needed as the data source did not report any
specific partitioning
- newPartitions.map(Seq(_))
- }
-
- } else {
- partitions
- }
- }
-
- override def outputPartitioning: Partitioning = {
- super.outputPartitioning match {
- case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
- // We allow duplicated partition values if
- //
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
- val newPartValues = commonPartitionValues.get.flatMap { case
(partValue, numSplits) =>
- Seq.fill(numSplits)(partValue)
- }
- k.copy(numPartitions = newPartValues.length, partitionValues =
newPartValues)
- case p => p
- }
- }
-
- override lazy val readerFactory: PartitionReaderFactory =
batch.createReaderFactory()
-
- override lazy val inputRDD: RDD[InternalRow] = {
- val rdd = if (filteredPartitions.isEmpty && outputPartitioning ==
SinglePartition) {
- // return an empty RDD with 1 partition if dynamic filtering removed the
only split
- sparkContext.parallelize(Array.empty[InternalRow], 1)
- } else {
- var finalPartitions = filteredPartitions
-
- outputPartitioning match {
- case p: KeyGroupedPartitioning =>
- if (conf.v2BucketingPushPartValuesEnabled &&
- conf.v2BucketingPartiallyClusteredDistributionEnabled) {
- assert(filteredPartitions.forall(_.size == 1),
- "Expect partitions to be not grouped when " +
-
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
- "is enabled")
-
- val groupedPartitions =
groupPartitions(finalPartitions.map(_.head), true).get
-
- // This means the input partitions are not grouped by partition
values. We'll need to
- // check `groupByPartitionValues` and decide whether to group and
replicate splits
- // within a partition.
- if (commonPartitionValues.isDefined && applyPartialClustering) {
- // A mapping from the common partition values to how many splits
the partition
- // should contain. Note this no longer maintain the partition
key ordering.
- val commonPartValuesMap = commonPartitionValues
- .get
- .map(t => (InternalRowComparableWrapper(t._1, p.expressions),
t._2))
- .toMap
- val nestGroupedPartitions = groupedPartitions.map {
- case (partValue, splits) =>
- // `commonPartValuesMap` should contain the part value since
it's the super set.
- val numSplits = commonPartValuesMap
- .get(InternalRowComparableWrapper(partValue,
p.expressions))
- assert(numSplits.isDefined, s"Partition value $partValue
does not exist in " +
- "common partition values from Spark plan")
-
- val newSplits = if (replicatePartitions) {
- // We need to also replicate partitions according to the
other side of join
- Seq.fill(numSplits.get)(splits)
- } else {
- // Not grouping by partition values: this could be the
side with partially
- // clustered distribution. Because of dynamic filtering,
we'll need to check if
- // the final number of splits of a partition is smaller
than the original
- // number, and fill with empty splits if so. This is
necessary so that both
- // sides of a join will have the same number of partitions
& splits.
- splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
- }
- (InternalRowComparableWrapper(partValue, p.expressions),
newSplits)
- }
-
- // Now fill missing partition keys with empty partitions
- val partitionMapping = nestGroupedPartitions.toMap
- finalPartitions = commonPartitionValues.get.flatMap { case
(partValue, numSplits) =>
- // Use empty partition for those partition values that are not
present.
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
- Seq.fill(numSplits)(Seq.empty))
- }
- } else {
- val partitionMapping = groupedPartitions.map { case (row, parts)
=>
- InternalRowComparableWrapper(row, p.expressions) -> parts
- }.toMap
- finalPartitions = p.partitionValues.map { partValue =>
- // Use empty partition for those partition values that are not
present
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
- }
- }
- } else {
- val partitionMapping = finalPartitions.map { parts =>
- val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
- InternalRowComparableWrapper(row, p.expressions) -> parts
- }.toMap
- finalPartitions = p.partitionValues.map { partValue =>
- // Use empty partition for those partition values that are not
present
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
- }
- }
-
- case _ =>
- }
-
- new DataSourceRDD(
- sparkContext, finalPartitions, readerFactory, supportsColumnar,
customMetrics)
- }
- postDriverMetrics()
- rdd
- }
-
- override def doCanonicalize(): BatchScanExec = {
- this.copy(
- output = output.map(QueryPlan.normalizeExpressions(_, output)),
- runtimeFilters = QueryPlan.normalizePredicates(
- runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
- output))
- }
-
- override def simpleString(maxFields: Int): String = {
- val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
- val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
- val result = s"$nodeName$truncatedOutputString ${scan.description()}
$runtimeFiltersString"
- redact(result)
- }
-
- override def nodeName: String = {
- s"BatchScan ${table.name()}".trim
- }
-}
diff --git
a/shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
b/shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from
shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to
shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 654e43cbd0..0f7c13448f 100644
--- a/shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.StructType
-object InternalRowUtl {
+object InternalRowUtil {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = ExpressionEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index d43331d57c..0000000000
---
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import com.google.common.base.Objects
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning,
Partitioning, SinglePartition}
-import org.apache.spark.sql.catalyst.util.{truncatedString,
InternalRowComparableWrapper}
-import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Physical plan node for scanning a batch of data from a data source v2.
- */
-case class BatchScanExec(
- output: Seq[AttributeReference],
- @transient scan: Scan,
- runtimeFilters: Seq[Expression],
- keyGroupedPartitioning: Option[Seq[Expression]] = None,
- ordering: Option[Seq[SortOrder]] = None,
- @transient table: Table,
- commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
- applyPartialClustering: Boolean = false,
- replicatePartitions: Boolean = false) extends DataSourceV2ScanExecBase {
-
- @transient lazy val batch = if (scan == null) null else scan.toBatch
-
- // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
- override def equals(other: Any): Boolean = other match {
- case other: BatchScanExec =>
- this.batch != null && this.batch == other.batch &&
- this.runtimeFilters == other.runtimeFilters &&
- this.commonPartitionValues == other.commonPartitionValues &&
- this.replicatePartitions == other.replicatePartitions &&
- this.applyPartialClustering == other.applyPartialClustering
- case _ =>
- false
- }
-
- override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
- @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
-
- @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
- val dataSourceFilters = runtimeFilters.flatMap {
- case DynamicPruningExpression(e) =>
DataSourceV2Strategy.translateRuntimeFilterV2(e)
- case _ => None
- }
-
- if (dataSourceFilters.nonEmpty) {
- val originalPartitioning = outputPartitioning
-
- // the cast is safe as runtime filters are only assigned if the scan can
be filtered
- val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
- filterableScan.filter(dataSourceFilters.toArray)
-
- // call toBatch again to get filtered partitions
- val newPartitions = scan.toBatch.planInputPartitions()
-
- originalPartitioning match {
- case p: KeyGroupedPartitioning =>
- if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
- throw new SparkException("Data source must have preserved the
original partitioning " +
- "during runtime filtering: not all partitions implement
HasPartitionKey after " +
- "filtering")
- }
- val newPartitionValues = newPartitions.map(partition =>
-
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey],
p.expressions))
- .toSet
- val oldPartitionValues = p.partitionValues
- .map(partition => InternalRowComparableWrapper(partition,
p.expressions)).toSet
- // We require the new number of partition values to be equal or less
than the old number
- // of partition values here. In the case of less than, empty
partitions will be added for
- // those missing values that are not present in the new input
partitions.
- if (oldPartitionValues.size < newPartitionValues.size) {
- throw new SparkException("During runtime filtering, data source
must either report " +
- "the same number of partition values, or a subset of partition
values from the " +
- s"original. Before: ${oldPartitionValues.size} partition
values. " +
- s"After: ${newPartitionValues.size} partition values")
- }
-
- if (!newPartitionValues.forall(oldPartitionValues.contains)) {
- throw new SparkException("During runtime filtering, data source
must not report new " +
- "partition values that are not present in the original
partitioning.")
- }
-
- groupPartitions(newPartitions).get.map(_._2)
-
- case _ =>
- // no validation is needed as the data source did not report any
specific partitioning
- newPartitions.map(Seq(_))
- }
-
- } else {
- partitions
- }
- }
-
- override def outputPartitioning: Partitioning = {
- super.outputPartitioning match {
- case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
- // We allow duplicated partition values if
- //
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
- val newPartValues = commonPartitionValues.get.flatMap { case
(partValue, numSplits) =>
- Seq.fill(numSplits)(partValue)
- }
- k.copy(numPartitions = newPartValues.length, partitionValues =
newPartValues)
- case p => p
- }
- }
-
- override lazy val readerFactory: PartitionReaderFactory =
batch.createReaderFactory()
-
- override lazy val inputRDD: RDD[InternalRow] = {
- val rdd = if (filteredPartitions.isEmpty && outputPartitioning ==
SinglePartition) {
- // return an empty RDD with 1 partition if dynamic filtering removed the
only split
- sparkContext.parallelize(Array.empty[InternalRow], 1)
- } else {
- var finalPartitions = filteredPartitions
-
- outputPartitioning match {
- case p: KeyGroupedPartitioning =>
- if (conf.v2BucketingPushPartValuesEnabled &&
- conf.v2BucketingPartiallyClusteredDistributionEnabled) {
- assert(filteredPartitions.forall(_.size == 1),
- "Expect partitions to be not grouped when " +
-
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
- "is enabled")
-
- val groupedPartitions =
groupPartitions(finalPartitions.map(_.head), true).get
-
- // This means the input partitions are not grouped by partition
values. We'll need to
- // check `groupByPartitionValues` and decide whether to group and
replicate splits
- // within a partition.
- if (commonPartitionValues.isDefined && applyPartialClustering) {
- // A mapping from the common partition values to how many splits
the partition
- // should contain. Note this no longer maintain the partition
key ordering.
- val commonPartValuesMap = commonPartitionValues
- .get
- .map(t => (InternalRowComparableWrapper(t._1, p.expressions),
t._2))
- .toMap
- val nestGroupedPartitions = groupedPartitions.map {
- case (partValue, splits) =>
- // `commonPartValuesMap` should contain the part value since
it's the super set.
- val numSplits = commonPartValuesMap
- .get(InternalRowComparableWrapper(partValue,
p.expressions))
- assert(numSplits.isDefined, s"Partition value $partValue
does not exist in " +
- "common partition values from Spark plan")
-
- val newSplits = if (replicatePartitions) {
- // We need to also replicate partitions according to the
other side of join
- Seq.fill(numSplits.get)(splits)
- } else {
- // Not grouping by partition values: this could be the
side with partially
- // clustered distribution. Because of dynamic filtering,
we'll need to check if
- // the final number of splits of a partition is smaller
than the original
- // number, and fill with empty splits if so. This is
necessary so that both
- // sides of a join will have the same number of partitions
& splits.
- splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
- }
- (InternalRowComparableWrapper(partValue, p.expressions),
newSplits)
- }
-
- // Now fill missing partition keys with empty partitions
- val partitionMapping = nestGroupedPartitions.toMap
- finalPartitions = commonPartitionValues.get.flatMap { case
(partValue, numSplits) =>
- // Use empty partition for those partition values that are not
present.
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
- Seq.fill(numSplits)(Seq.empty))
- }
- } else {
- val partitionMapping = groupedPartitions.map { case (row, parts)
=>
- InternalRowComparableWrapper(row, p.expressions) -> parts
- }.toMap
- finalPartitions = p.partitionValues.map { partValue =>
- // Use empty partition for those partition values that are not
present
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
- }
- }
- } else {
- val partitionMapping = finalPartitions.map { parts =>
- val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
- InternalRowComparableWrapper(row, p.expressions) -> parts
- }.toMap
- finalPartitions = p.partitionValues.map { partValue =>
- // Use empty partition for those partition values that are not
present
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
- }
- }
-
- case _ =>
- }
-
- new DataSourceRDD(
- sparkContext, finalPartitions, readerFactory, supportsColumnar,
customMetrics)
- }
- postDriverMetrics()
- rdd
- }
-
- override def doCanonicalize(): BatchScanExec = {
- this.copy(
- output = output.map(QueryPlan.normalizeExpressions(_, output)),
- runtimeFilters = QueryPlan.normalizePredicates(
- runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
- output))
- }
-
- override def simpleString(maxFields: Int): String = {
- val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
- val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
- val result = s"$nodeName$truncatedOutputString ${scan.description()}
$runtimeFiltersString"
- redact(result)
- }
-
- override def nodeName: String = {
- s"BatchScan ${table.name()}".trim
- }
-}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
b/shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from
shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to
shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 654e43cbd0..0f7c13448f 100644
--- a/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.StructType
-object InternalRowUtl {
+object InternalRowUtil {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = ExpressionEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
diff --git
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index d43331d57c..0000000000
---
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import com.google.common.base.Objects
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning,
Partitioning, SinglePartition}
-import org.apache.spark.sql.catalyst.util.{truncatedString,
InternalRowComparableWrapper}
-import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Physical plan node for scanning a batch of data from a data source v2.
- */
-case class BatchScanExec(
- output: Seq[AttributeReference],
- @transient scan: Scan,
- runtimeFilters: Seq[Expression],
- keyGroupedPartitioning: Option[Seq[Expression]] = None,
- ordering: Option[Seq[SortOrder]] = None,
- @transient table: Table,
- commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
- applyPartialClustering: Boolean = false,
- replicatePartitions: Boolean = false) extends DataSourceV2ScanExecBase {
-
- @transient lazy val batch = if (scan == null) null else scan.toBatch
-
- // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
- override def equals(other: Any): Boolean = other match {
- case other: BatchScanExec =>
- this.batch != null && this.batch == other.batch &&
- this.runtimeFilters == other.runtimeFilters &&
- this.commonPartitionValues == other.commonPartitionValues &&
- this.replicatePartitions == other.replicatePartitions &&
- this.applyPartialClustering == other.applyPartialClustering
- case _ =>
- false
- }
-
- override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
- @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
-
- @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
- val dataSourceFilters = runtimeFilters.flatMap {
- case DynamicPruningExpression(e) =>
DataSourceV2Strategy.translateRuntimeFilterV2(e)
- case _ => None
- }
-
- if (dataSourceFilters.nonEmpty) {
- val originalPartitioning = outputPartitioning
-
- // the cast is safe as runtime filters are only assigned if the scan can
be filtered
- val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
- filterableScan.filter(dataSourceFilters.toArray)
-
- // call toBatch again to get filtered partitions
- val newPartitions = scan.toBatch.planInputPartitions()
-
- originalPartitioning match {
- case p: KeyGroupedPartitioning =>
- if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
- throw new SparkException("Data source must have preserved the
original partitioning " +
- "during runtime filtering: not all partitions implement
HasPartitionKey after " +
- "filtering")
- }
- val newPartitionValues = newPartitions.map(partition =>
-
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey],
p.expressions))
- .toSet
- val oldPartitionValues = p.partitionValues
- .map(partition => InternalRowComparableWrapper(partition,
p.expressions)).toSet
- // We require the new number of partition values to be equal or less
than the old number
- // of partition values here. In the case of less than, empty
partitions will be added for
- // those missing values that are not present in the new input
partitions.
- if (oldPartitionValues.size < newPartitionValues.size) {
- throw new SparkException("During runtime filtering, data source
must either report " +
- "the same number of partition values, or a subset of partition
values from the " +
- s"original. Before: ${oldPartitionValues.size} partition
values. " +
- s"After: ${newPartitionValues.size} partition values")
- }
-
- if (!newPartitionValues.forall(oldPartitionValues.contains)) {
- throw new SparkException("During runtime filtering, data source
must not report new " +
- "partition values that are not present in the original
partitioning.")
- }
-
- groupPartitions(newPartitions).get.map(_._2)
-
- case _ =>
- // no validation is needed as the data source did not report any
specific partitioning
- newPartitions.map(Seq(_))
- }
-
- } else {
- partitions
- }
- }
-
- override def outputPartitioning: Partitioning = {
- super.outputPartitioning match {
- case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
- // We allow duplicated partition values if
- //
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
- val newPartValues = commonPartitionValues.get.flatMap { case
(partValue, numSplits) =>
- Seq.fill(numSplits)(partValue)
- }
- k.copy(numPartitions = newPartValues.length, partitionValues =
newPartValues)
- case p => p
- }
- }
-
- override lazy val readerFactory: PartitionReaderFactory =
batch.createReaderFactory()
-
- override lazy val inputRDD: RDD[InternalRow] = {
- val rdd = if (filteredPartitions.isEmpty && outputPartitioning ==
SinglePartition) {
- // return an empty RDD with 1 partition if dynamic filtering removed the
only split
- sparkContext.parallelize(Array.empty[InternalRow], 1)
- } else {
- var finalPartitions = filteredPartitions
-
- outputPartitioning match {
- case p: KeyGroupedPartitioning =>
- if (conf.v2BucketingPushPartValuesEnabled &&
- conf.v2BucketingPartiallyClusteredDistributionEnabled) {
- assert(filteredPartitions.forall(_.size == 1),
- "Expect partitions to be not grouped when " +
-
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
- "is enabled")
-
- val groupedPartitions =
groupPartitions(finalPartitions.map(_.head), true).get
-
- // This means the input partitions are not grouped by partition
values. We'll need to
- // check `groupByPartitionValues` and decide whether to group and
replicate splits
- // within a partition.
- if (commonPartitionValues.isDefined && applyPartialClustering) {
- // A mapping from the common partition values to how many splits
the partition
- // should contain. Note this no longer maintain the partition
key ordering.
- val commonPartValuesMap = commonPartitionValues
- .get
- .map(t => (InternalRowComparableWrapper(t._1, p.expressions),
t._2))
- .toMap
- val nestGroupedPartitions = groupedPartitions.map {
- case (partValue, splits) =>
- // `commonPartValuesMap` should contain the part value since
it's the super set.
- val numSplits = commonPartValuesMap
- .get(InternalRowComparableWrapper(partValue,
p.expressions))
- assert(numSplits.isDefined, s"Partition value $partValue
does not exist in " +
- "common partition values from Spark plan")
-
- val newSplits = if (replicatePartitions) {
- // We need to also replicate partitions according to the
other side of join
- Seq.fill(numSplits.get)(splits)
- } else {
- // Not grouping by partition values: this could be the
side with partially
- // clustered distribution. Because of dynamic filtering,
we'll need to check if
- // the final number of splits of a partition is smaller
than the original
- // number, and fill with empty splits if so. This is
necessary so that both
- // sides of a join will have the same number of partitions
& splits.
- splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
- }
- (InternalRowComparableWrapper(partValue, p.expressions),
newSplits)
- }
-
- // Now fill missing partition keys with empty partitions
- val partitionMapping = nestGroupedPartitions.toMap
- finalPartitions = commonPartitionValues.get.flatMap { case
(partValue, numSplits) =>
- // Use empty partition for those partition values that are not
present.
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
- Seq.fill(numSplits)(Seq.empty))
- }
- } else {
- val partitionMapping = groupedPartitions.map { case (row, parts)
=>
- InternalRowComparableWrapper(row, p.expressions) -> parts
- }.toMap
- finalPartitions = p.partitionValues.map { partValue =>
- // Use empty partition for those partition values that are not
present
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
- }
- }
- } else {
- val partitionMapping = finalPartitions.map { parts =>
- val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
- InternalRowComparableWrapper(row, p.expressions) -> parts
- }.toMap
- finalPartitions = p.partitionValues.map { partValue =>
- // Use empty partition for those partition values that are not
present
- partitionMapping.getOrElse(
- InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
- }
- }
-
- case _ =>
- }
-
- new DataSourceRDD(
- sparkContext, finalPartitions, readerFactory, supportsColumnar,
customMetrics)
- }
- postDriverMetrics()
- rdd
- }
-
- override def doCanonicalize(): BatchScanExec = {
- this.copy(
- output = output.map(QueryPlan.normalizeExpressions(_, output)),
- runtimeFilters = QueryPlan.normalizePredicates(
- runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
- output))
- }
-
- override def simpleString(maxFields: Int): String = {
- val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
- val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
- val result = s"$nodeName$truncatedOutputString ${scan.description()}
$runtimeFiltersString"
- redact(result)
- }
-
- override def nodeName: String = {
- s"BatchScan ${table.name()}".trim
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]