This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new dcc15adacc [GLUTEN-8852][VL] Fix compilation issues in existing tests
for Spark 4.0.0 (#10434)
dcc15adacc is described below
commit dcc15adacca7c82141cb8fa1f0333e4f723905e1
Author: Terry Wang <[email protected]>
AuthorDate: Sun Aug 17 13:35:59 2025 +0800
[GLUTEN-8852][VL] Fix compilation issues in existing tests for Spark 4.0.0
(#10434)
---
.../execution/VeloxStringFunctionsSuite.scala | 7 +++--
.../execution/benchmark/VeloxRasBenchmark.scala | 5 ++--
.../execution/joins/GlutenExistenceJoinSuite.scala | 9 ++++---
pom.xml | 19 +++++++++++++
.../org/apache/gluten/sql/shims/SparkShims.scala | 11 +++++++-
.../gluten/sql/shims/spark32/Spark32Shims.scala | 14 +++++++++-
.../scala/org/apache/spark/sql/SparkSqlUtils.scala | 31 ++++++++++++++++++++++
.../gluten/sql/shims/spark33/Spark33Shims.scala | 14 +++++++++-
.../scala/org/apache/spark/sql/SparkSqlUtils.scala | 31 ++++++++++++++++++++++
.../gluten/sql/shims/spark34/Spark34Shims.scala | 14 +++++++++-
.../scala/org/apache/spark/sql/SparkSqlUtils.scala | 31 ++++++++++++++++++++++
.../gluten/sql/shims/spark35/Spark35Shims.scala | 14 +++++++++-
.../scala/org/apache/spark/sql/SparkSqlUtils.scala | 31 ++++++++++++++++++++++
.../gluten/sql/shims/spark40/Spark40Shims.scala | 13 ++++++++-
.../scala/org/apache/spark/sql/SparkSqlUtils.scala | 31 ++++++++++++++++++++++
15 files changed, 259 insertions(+), 16 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
index 24fb04f4c7..c1dd956176 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
@@ -17,10 +17,8 @@
package org.apache.gluten.execution
import org.apache.spark.SparkConf
-import org.apache.spark.sql.Column
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding,
NullPropagation}
-import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.StringType
class VeloxStringFunctionsSuite extends VeloxWholeStageTransformerSuite {
@@ -35,10 +33,11 @@ class VeloxStringFunctionsSuite extends
VeloxWholeStageTransformerSuite {
override def beforeAll(): Unit = {
super.beforeAll()
+ val nullColumn = lit(null).cast(StringType).alias(NULL_STR_COL)
createTPCHNotNullTables()
spark
.table("lineitem")
- .select(col("*"), new Column(Alias(Literal(null, StringType),
NULL_STR_COL)()))
+ .select(col("*"), nullColumn)
.createOrReplaceTempView(LINEITEM_TABLE)
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/VeloxRasBenchmark.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/VeloxRasBenchmark.scala
index 9a9ae6407a..0c48d75cf4 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/VeloxRasBenchmark.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/VeloxRasBenchmark.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.benchmark
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.Table
+import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.Arm
import org.apache.spark.benchmark.Benchmark
@@ -71,14 +72,14 @@ object VeloxRasBenchmark extends SqlBasedBenchmark {
}
private def createLegacySession(): SparkSession = {
- SparkSession.cleanupAnyExistingSession()
+ SparkShimLoader.getSparkShims.cleanupAnyExistingSession()
sessionBuilder()
.config(GlutenConfig.RAS_ENABLED.key, false)
.getOrCreate()
}
private def createRasSession(): SparkSession = {
- SparkSession.cleanupAnyExistingSession()
+ SparkShimLoader.getSparkShims.cleanupAnyExistingSession()
sessionBuilder()
.config(GlutenConfig.RAS_ENABLED.key, true)
.getOrCreate()
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/joins/GlutenExistenceJoinSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/joins/GlutenExistenceJoinSuite.scala
index 22063596f4..9540c6af0c 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/joins/GlutenExistenceJoinSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/joins/GlutenExistenceJoinSuite.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.execution.joins
import
org.apache.gluten.execution.{VeloxBroadcastNestedLoopJoinExecTransformer,
VeloxWholeStageTransformerSuite}
+import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
import org.apache.spark.sql.catalyst.plans.logical._
@@ -54,8 +55,8 @@ class GlutenExistenceJoinSuite extends
VeloxWholeStageTransformerSuite with SQLT
new StructType().add("id", IntegerType).add("val", StringType)
)
- val leftPlan = left.logicalPlan
- val rightPlan = right.logicalPlan
+ val leftPlan =
SparkShimLoader.getSparkShims.getLogicalPlanFromDataFrame(left)
+ val rightPlan =
SparkShimLoader.getSparkShims.getLogicalPlanFromDataFrame(right)
val existsAttr = AttributeReference("exists", BooleanType, nullable =
false)()
@@ -74,7 +75,7 @@ class GlutenExistenceJoinSuite extends
VeloxWholeStageTransformerSuite with SQLT
child = existenceJoin
)
- val df = Dataset.ofRows(spark, project)
+ val df = SparkShimLoader.getSparkShims.dataSetOfRows(spark, project)
assert(existenceJoin.joinType == ExistenceJoin(existsAttr))
assert(existenceJoin.condition.contains(joinCondition))
diff --git a/pom.xml b/pom.xml
index 7bccae1427..b4334842dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -486,6 +486,25 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>scala-test-compile-first</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <configuration>
+ <excludes>
+ <exclude>**/GlutenHiveUDFSuite.scala</exclude>
+ <exclude>**/VeloxParquetWriteForHiveSuite.scala</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</profile>
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 570e3cfef5..6fd93b807d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -24,7 +24,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.shuffle.ShuffleHandle
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.csv.CSVOptions
@@ -318,4 +318,13 @@ trait SparkShims {
def unBase64FunctionFailsOnError(unBase64: UnBase64): Boolean = false
def widerDecimalType(d1: DecimalType, d2: DecimalType): DecimalType
+
+ /** For test use only */
+ def cleanupAnyExistingSession(): Unit
+
+ /** For test use only */
+ def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan
+
+ /** For test use only */
+ def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan): DataFrame
}
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index baf7868f32..84c1cdba86 100644
---
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.utils.ExceptionUtils
import org.apache.spark.{ShuffleUtils, SparkContext}
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.shuffle.ShuffleHandle
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession,
SparkSqlUtils}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.DecimalPrecision
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -315,4 +315,16 @@ class Spark32Shims extends SparkShims {
DecimalPrecision.widerDecimalType(d1, d2)
}
+ override def cleanupAnyExistingSession(): Unit = {
+ SparkSqlUtils.cleanupAnyExistingSession()
+ }
+
+ override def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan = {
+ SparkSqlUtils.getLogicalPlanFromDataFrame(df)
+ }
+
+ override def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan):
DataFrame = {
+ SparkSqlUtils.dataSetOfRows(spark, logicalPlan)
+ }
+
}
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
new file mode 100644
index 0000000000..78a1542b47
--- /dev/null
+++ b/shims/spark32/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+object SparkSqlUtils {
+ def cleanupAnyExistingSession(): Unit = {
+ SparkSession.cleanupAnyExistingSession()
+ }
+ def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan = {
+ df.logicalPlan
+ }
+ def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan): DataFrame
= {
+ Dataset.ofRows(spark, logicalPlan)
+ }
+}
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 0ad1d94e79..4edd715a1e 100644
---
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -25,7 +25,7 @@ import org.apache.gluten.utils.ExceptionUtils
import org.apache.spark._
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.shuffle.ShuffleHandle
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession,
SparkSqlUtils}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.DecimalPrecision
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -409,4 +409,16 @@ class Spark33Shims extends SparkShims {
DecimalPrecision.widerDecimalType(d1, d2)
}
+ override def cleanupAnyExistingSession(): Unit = {
+ SparkSqlUtils.cleanupAnyExistingSession()
+ }
+
+ override def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan = {
+ SparkSqlUtils.getLogicalPlanFromDataFrame(df)
+ }
+
+ override def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan):
DataFrame = {
+ SparkSqlUtils.dataSetOfRows(spark, logicalPlan)
+ }
+
}
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
new file mode 100644
index 0000000000..78a1542b47
--- /dev/null
+++ b/shims/spark33/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+object SparkSqlUtils {
+ def cleanupAnyExistingSession(): Unit = {
+ SparkSession.cleanupAnyExistingSession()
+ }
+ def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan = {
+ df.logicalPlan
+ }
+ def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan): DataFrame
= {
+ Dataset.ofRows(spark, logicalPlan)
+ }
+}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 16affb2ad8..efbfbf46cc 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -27,7 +27,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.paths.SparkPath
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.shuffle.ShuffleHandle
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession,
SparkSqlUtils}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.DecimalPrecision
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -647,4 +647,16 @@ class Spark34Shims extends SparkShims {
DecimalPrecision.widerDecimalType(d1, d2)
}
+ override def cleanupAnyExistingSession(): Unit = {
+ SparkSqlUtils.cleanupAnyExistingSession()
+ }
+
+ override def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan = {
+ SparkSqlUtils.getLogicalPlanFromDataFrame(df)
+ }
+
+ override def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan):
DataFrame = {
+ SparkSqlUtils.dataSetOfRows(spark, logicalPlan)
+ }
+
}
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
new file mode 100644
index 0000000000..78a1542b47
--- /dev/null
+++ b/shims/spark34/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+object SparkSqlUtils {
+ def cleanupAnyExistingSession(): Unit = {
+ SparkSession.cleanupAnyExistingSession()
+ }
+ def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan = {
+ df.logicalPlan
+ }
+ def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan): DataFrame
= {
+ Dataset.ofRows(spark, logicalPlan)
+ }
+}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index e48a401722..4b47f0b52e 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -27,7 +27,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.paths.SparkPath
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.shuffle.ShuffleHandle
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession,
SparkSqlUtils}
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, InternalRow}
import org.apache.spark.sql.catalyst.analysis.DecimalPrecision
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -707,4 +707,16 @@ class Spark35Shims extends SparkShims {
DecimalPrecision.widerDecimalType(d1, d2)
}
+ override def cleanupAnyExistingSession(): Unit = {
+ SparkSqlUtils.cleanupAnyExistingSession()
+ }
+
+ override def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan = {
+ SparkSqlUtils.getLogicalPlanFromDataFrame(df)
+ }
+
+ override def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan):
DataFrame = {
+ SparkSqlUtils.dataSetOfRows(spark, logicalPlan)
+ }
+
}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
new file mode 100644
index 0000000000..78a1542b47
--- /dev/null
+++ b/shims/spark35/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+object SparkSqlUtils {
+ def cleanupAnyExistingSession(): Unit = {
+ SparkSession.cleanupAnyExistingSession()
+ }
+ def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan = {
+ df.logicalPlan
+ }
+ def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan): DataFrame
= {
+ Dataset.ofRows(spark, logicalPlan)
+ }
+}
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index f24d489929..2b20e44b27 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -27,7 +27,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.paths.SparkPath
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.shuffle.ShuffleHandle
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession,
SparkSqlUtils}
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, InternalRow}
import org.apache.spark.sql.catalyst.analysis.DecimalPrecisionTypeCoercion
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -696,4 +696,15 @@ class Spark40Shims extends SparkShims {
DecimalPrecisionTypeCoercion.widerDecimalType(d1, d2)
}
+ override def cleanupAnyExistingSession(): Unit = {
+ SparkSqlUtils.cleanupAnyExistingSession()
+ }
+
+ override def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan = {
+ SparkSqlUtils.getLogicalPlanFromDataFrame(df)
+ }
+
+ def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan): DataFrame
= {
+ SparkSqlUtils.dataSetOfRows(spark, logicalPlan)
+ }
}
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
b/shims/spark40/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
new file mode 100644
index 0000000000..dc64f905b4
--- /dev/null
+++ b/shims/spark40/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+object SparkSqlUtils {
+ def cleanupAnyExistingSession(): Unit = {
+ classic.SparkSession.cleanupAnyExistingSession()
+ }
+ def getLogicalPlanFromDataFrame(df: DataFrame): LogicalPlan = {
+ df.asInstanceOf[classic.Dataset[Row]].logicalPlan
+ }
+ def dataSetOfRows(spark: SparkSession, logicalPlan: LogicalPlan): DataFrame
= {
+ classic.Dataset.ofRows(spark.asInstanceOf[classic.SparkSession],
logicalPlan)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]