This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git

commit 9026029bc1903f5e88849c0479abd738295e898c
Author: Chang chen <[email protected]>
AuthorDate: Tue Jan 6 16:52:34 2026 +0800

    Spark: Initial support for 4.1.0
    
    | Cause | Type | Category | Description | Affected Files |
    |-------|------|----------|-------------|----------------|
    | - | Feat | Feature | Introduce Spark41Shims and update build 
configuration to support Spark 4.1. | 
pom.xml<br>shims/pom.xml<br>shims/spark41/pom.xml<br>shims/spark41/.../META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider<br>shims/spark41/.../spark41/Spark41Shims.scala<br>shims/spark41/.../spark41/SparkShimProvider.scala
 |
    | [#51477](https://github.com/apache/spark/pull/51477) | Fix | 
Compatibility | Use class name instead of class object for streaming call 
detection to ensure Spark 4.1 compatibility. | 
gluten-core/.../caller/CallerInfo.scala |
    | [#50852](https://github.com/apache/spark/pull/50852) | Fix | 
Compatibility | Add printOutputColumns parameter to generateTreeString methods 
| shims/spark41/.../GenerateTreeStringShim.scala |
    | [#51775](https://github.com/apache/spark/pull/51775) | Fix | 
Compatibility | Remove unused MDC import in FileSourceScanExecShim.scala | 
shims/spark41/.../FileSourceScanExecShim.scala |
    | [#51979](https://github.com/apache/spark/pull/51979) | Fix | 
Compatibility | Add missing StoragePartitionJoinParams import in 
BatchScanExecShim and AbstractBatchScanExec | 
shims/spark41/.../v2/AbstractBatchScanExec.scala<br>shims/spark41/.../v2/BatchScanExecShim.scala
 |
    | [#51302](https://github.com/apache/spark/pull/51302) | Fix | 
Compatibility | Remove TimeAdd from ExpressionConverter and ExpressionMappings 
for test | 
gluten-substrait/.../ExpressionConverter.scala<br>gluten-substrait/.../ExpressionMappings.scala
 |
    | [#50598](https://github.com/apache/spark/pull/50598) | Fix | 
Compatibility | Adapt to QueryExecution.createSparkPlan interface change | 
gluten-substrait/.../GlutenImplicits.scala<br>shims/spark\*/.../shims/spark\*/Spark*Shims.scala
 |
    | [#52599](https://github.com/apache/spark/pull/52599) | Fix | 
Compatibility | Adapt to DataSourceV2Relation interface change | 
backends-velox/.../ArrowConvertorRule.scala |
    | [#52384](https://github.com/apache/spark/pull/52384) | Fix | 
Compatibility | Using new interface of ParquetFooterReader | 
backends-velox/.../ParquetMetadataUtils.scala<br>gluten-ut/spark40/.../parquet/GlutenParquetRowIndexSuite.scala<br>shims/spark*/.../parquet/ParquetFooterReaderShim.scala
 |
    | [#52509](https://github.com/apache/spark/pull/52509) | Fix | Build | 
Update Scala version to 2.13.17 in pom.xml to fix `java.lang.NoSuchMethodError: 
'java.lang.String scala.util.hashing.MurmurHash3$.caseClassHash$default$2()'` | 
pom.xml |
    | - | Fix | Test | Refactor Spark version checks in VeloxHashJoinSuite to 
improve readability and maintainability | 
backends-velox/.../VeloxHashJoinSuite.scala |
    | [#50849](https://github.com/apache/spark/pull/50849) | Fix | Test | Fix 
MiscOperatorSuite to support OneRowRelationExec plan Spark 4.1 | 
backends-velox/.../MiscOperatorSuite.scala |
    | [#52723](https://github.com/apache/spark/pull/52723) | Fix | 
Compatibility | Add GeographyVal and GeometryVal support in ColumnarArrayShim | 
shims/spark41/.../vectorized/ColumnarArrayShim.java |
    | [#48470](https://github.com/apache/spark/pull/48470) | 4.1.0 | Exclude | 
Exclude split test in VeloxStringFunctionsSuite | 
backends-velox/.../VeloxStringFunctionsSuite.scala |
    | [#51259](https://github.com/apache/spark/pull/51259) | 4.1.0 | Exclude | 
Only Run ArrowEvalPythonExecSuite tests up to Spark 4.0, we need update ci 
python to 3.10 | backends-velox/.../python/ArrowEvalPythonExecSuite.scala |
---
 .github/workflows/util/install-spark-resources.sh  |   5 +
 .github/workflows/velox_backend_x86.yml            | 106 +++++++++++++++++++++
 .../gluten/extension/ArrowConvertorRule.scala      |  44 +++++----
 .../apache/gluten/utils/ParquetMetadataUtils.scala |   4 +-
 .../gluten/execution/MiscOperatorSuite.scala       |   6 +-
 .../gluten/execution/VeloxHashJoinSuite.scala      |   7 +-
 .../execution/VeloxStringFunctionsSuite.scala      |   3 +-
 .../python/ArrowEvalPythonExecSuite.scala          |   6 +-
 dev/format-scala-code.sh                           |   2 +-
 .../gluten/extension/caller/CallerInfo.scala       |   9 +-
 .../org/apache/spark/util/SparkVersionUtil.scala   |   1 +
 .../gluten/expression/ExpressionConverter.scala    |   7 --
 .../gluten/expression/ExpressionMappings.scala     |   1 -
 .../spark/sql/execution/GlutenImplicits.scala      |   8 +-
 .../parquet/GlutenParquetRowIndexSuite.scala       |   2 +-
 pom.xml                                            |  82 +++++++++++++++-
 .../org/apache/gluten/sql/shims/SparkShims.scala   |  10 ++
 shims/pom.xml                                      |   6 ++
 .../gluten/sql/shims/spark32/Spark32Shims.scala    |  14 ++-
 .../parquet/ParquetFooterReaderShim.scala          |  42 ++++++++
 .../gluten/sql/shims/spark33/Spark33Shims.scala    |  14 ++-
 .../parquet/ParquetFooterReaderShim.scala          |  42 ++++++++
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |  12 +++
 .../parquet/ParquetFooterReaderShim.scala          |  42 ++++++++
 .../gluten/sql/shims/spark35/Spark35Shims.scala    |  12 +++
 .../parquet/ParquetFooterReaderShim.scala          |  42 ++++++++
 .../gluten/sql/shims/spark40/Spark40Shims.scala    |  13 +++
 .../parquet/ParquetFooterReaderShim.scala          |  42 ++++++++
 .../sql/execution/datasources/v2/Spark35Scan.scala |  50 ----------
 shims/spark41/pom.xml                              |   4 +-
 .../execution/vectorized/ColumnarArrayShim.java    |  12 +++
 .../org.apache.gluten.sql.shims.SparkShimProvider  |   2 +-
 .../gluten/execution/GenerateTreeStringShim.scala  |   6 +-
 .../Spark41Shims.scala}                            |  16 +++-
 .../{spark40 => spark41}/SparkShimProvider.scala   |   4 +-
 .../sql/execution/FileSourceScanExecShim.scala     |   1 -
 .../parquet/ParquetFooterReaderShim.scala          |  43 +++++++++
 .../datasources/v2/AbstractBatchScanExec.scala     |   1 +
 .../datasources/v2/BatchScanExecShim.scala         |   1 +
 .../sql/execution/datasources/v2/Spark35Scan.scala |  50 ----------
 40 files changed, 616 insertions(+), 158 deletions(-)

diff --git a/.github/workflows/util/install-spark-resources.sh 
b/.github/workflows/util/install-spark-resources.sh
index 4d1dd27a9c..d245dbb4ac 100755
--- a/.github/workflows/util/install-spark-resources.sh
+++ b/.github/workflows/util/install-spark-resources.sh
@@ -119,6 +119,11 @@ case "$1" in
     cd ${INSTALL_DIR} && \
     install_spark "4.0.1" "3" "2.12"
     ;;
+4.1)
+    # Spark-4.x, scala 2.12 // using 2.12 as a hack as 4.0 does not have 2.13 
suffix
+    cd ${INSTALL_DIR} && \
+    install_spark "4.1.0" "3" "2.12"
+    ;;
 *)
     echo "Spark version is expected to be specified."
     exit 1
diff --git a/.github/workflows/velox_backend_x86.yml 
b/.github/workflows/velox_backend_x86.yml
index 9adde6c5ce..5ef70a79bc 100644
--- a/.github/workflows/velox_backend_x86.yml
+++ b/.github/workflows/velox_backend_x86.yml
@@ -1481,3 +1481,109 @@ jobs:
             **/target/*.log
             **/gluten-ut/**/hs_err_*.log
             **/gluten-ut/**/core.*
+
+  spark-test-spark41:
+    needs: build-native-lib-centos-7
+    runs-on: ubuntu-22.04
+    env:
+      SPARK_TESTING: true
+    container: apache/gluten:centos-8-jdk17
+    steps:
+      - uses: actions/checkout@v2
+      - name: Download All Artifacts
+        uses: actions/download-artifact@v4
+        with:
+          name: velox-native-lib-centos-7-${{github.sha}}
+          path: ./cpp/build/releases
+      - name: Download Arrow Jars
+        uses: actions/download-artifact@v4
+        with:
+          name: arrow-jars-centos-7-${{github.sha}}
+          path: /root/.m2/repository/org/apache/arrow/
+      - name: Prepare
+        run: |
+          dnf module -y install python39 && \
+          alternatives --set python3 /usr/bin/python3.9 && \
+          pip3 install setuptools==77.0.3 && \
+          pip3 install pyspark==3.5.5 cython && \
+          pip3 install pandas==2.2.3 pyarrow==20.0.0
+      - name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image 
update
+        run: |
+          rm -rf /opt/shims/spark41
+          bash .github/workflows/util/install-spark-resources.sh 4.1
+          mv /opt/shims/spark41/spark_home/assembly/target/scala-2.12 
/opt/shims/spark41/spark_home/assembly/target/scala-2.13
+      - name: Build and Run unit test for Spark 4.1.0 with scala-2.13 (other 
tests)
+        run: |
+          cd $GITHUB_WORKSPACE/
+          export SPARK_SCALA_VERSION=2.13
+          yum install -y java-17-openjdk-devel
+          export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
+          export PATH=$JAVA_HOME/bin:$PATH
+          java -version
+          $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 
-Pbackends-velox \
+          -Pspark-ut 
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
+          
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
+      - name: Upload test report
+        if: always()
+        uses: actions/upload-artifact@v4
+        with:
+          name: ${{ github.job }}-report
+          path: '**/surefire-reports/TEST-*.xml'
+      - name: Upload unit tests log files
+        if: ${{ !success() }}
+        uses: actions/upload-artifact@v4
+        with:
+          name: ${{ github.job }}-test-log
+          path: |
+            **/target/*.log
+            **/gluten-ut/**/hs_err_*.log
+            **/gluten-ut/**/core.*
+
+  spark-test-spark41-slow:
+    needs: build-native-lib-centos-7
+    runs-on: ubuntu-22.04
+    env:
+      SPARK_TESTING: true
+    container: apache/gluten:centos-8-jdk17
+    steps:
+      - uses: actions/checkout@v2
+      - name: Download All Artifacts
+        uses: actions/download-artifact@v4
+        with:
+          name: velox-native-lib-centos-7-${{github.sha}}
+          path: ./cpp/build/releases
+      - name: Download Arrow Jars
+        uses: actions/download-artifact@v4
+        with:
+          name: arrow-jars-centos-7-${{github.sha}}
+          path: /root/.m2/repository/org/apache/arrow/
+      - name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image 
update
+        run: |
+          rm -rf /opt/shims/spark41
+          bash .github/workflows/util/install-spark-resources.sh 4.1
+          mv /opt/shims/spark41/spark_home/assembly/target/scala-2.12 
/opt/shims/spark41/spark_home/assembly/target/scala-2.13
+      - name: Build and Run unit test for Spark 4.0 (slow tests)
+        run: |
+          cd $GITHUB_WORKSPACE/
+          yum install -y java-17-openjdk-devel
+          export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
+          export PATH=$JAVA_HOME/bin:$PATH
+          java -version
+          $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 
-Pbackends-velox -Pspark-ut \
+          -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
+          -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
+      - name: Upload test report
+        if: always()
+        uses: actions/upload-artifact@v4
+        with:
+          name: ${{ github.job }}-report
+          path: '**/surefire-reports/TEST-*.xml'
+      - name: Upload unit tests log files
+        if: ${{ !success() }}
+        uses: actions/upload-artifact@v4
+        with:
+          name: ${{ github.job }}-test-log
+          path: |
+            **/target/*.log
+            **/gluten-ut/**/hs_err_*.log
+            **/gluten-ut/**/core.*
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
index 25371be8d1..925f2a6be9 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
@@ -38,6 +38,24 @@ import java.nio.charset.StandardCharsets
 
 import scala.collection.convert.ImplicitConversions.`map AsScala`
 
+/**
+ * Extracts a CSVTable from a DataSourceV2Relation.
+ *
+ * Only the table variable of DataSourceV2Relation is accessed to improve 
compatibility across
+ * different Spark versions.
+ * @since Spark
+ *   4.1
+ */
+private object CSVTableExtractor {
+  def unapply(relation: DataSourceV2Relation): Option[(DataSourceV2Relation, 
CSVTable)] = {
+    relation.table match {
+      case t: CSVTable =>
+        Some((relation, t))
+      case _ => None
+    }
+  }
+}
+
 @Experimental
 case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] 
{
   override def apply(plan: LogicalPlan): LogicalPlan = {
@@ -56,25 +74,15 @@ case class ArrowConvertorRule(session: SparkSession) 
extends Rule[LogicalPlan] {
             l.copy(relation = r.copy(fileFormat = new 
ArrowCSVFileFormat(csvOptions))(session))
           case _ => l
         }
-      case d @ DataSourceV2Relation(
-            t @ CSVTable(
-              name,
-              sparkSession,
-              options,
-              paths,
-              userSpecifiedSchema,
-              fallbackFileFormat),
-            _,
-            _,
-            _,
-            _) if validate(session, t.dataSchema, 
options.asCaseSensitiveMap().toMap) =>
+      case CSVTableExtractor(d, t)
+          if validate(session, t.dataSchema, 
t.options.asCaseSensitiveMap().toMap) =>
         d.copy(table = ArrowCSVTable(
-          "arrow" + name,
-          sparkSession,
-          options,
-          paths,
-          userSpecifiedSchema,
-          fallbackFileFormat))
+          "arrow" + t.name,
+          t.sparkSession,
+          t.options,
+          t.paths,
+          t.userSpecifiedSchema,
+          t.fallbackFileFormat))
       case r =>
         r
     }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index 6239ab5ad7..ab76cba4aa 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReader, 
ParquetOptions}
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, 
ParquetOptions}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
@@ -135,7 +135,7 @@ object ParquetMetadataUtils extends Logging {
       parquetOptions: ParquetOptions): Option[String] = {
     val footer =
       try {
-        ParquetFooterReader.readFooter(conf, fileStatus, 
ParquetMetadataConverter.NO_FILTER)
+        ParquetFooterReaderShim.readFooter(conf, fileStatus, 
ParquetMetadataConverter.NO_FILTER)
       } catch {
         case e: Exception if ExceptionUtils.hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
           return Some("Encrypted Parquet footer detected.")
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index c313242091..cfddfb8e21 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -753,7 +753,11 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
     val df = sql("SELECT 1")
     checkAnswer(df, Row(1))
     val plan = df.queryExecution.executedPlan
-    assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined)
+    if (isSparkVersionGE("4.1")) {
+      assert(plan.find(_.getClass.getSimpleName == 
"OneRowRelationExec").isDefined)
+    } else {
+      assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined)
+    }
     assert(plan.find(_.isInstanceOf[ProjectExecTransformer]).isDefined)
     assert(plan.find(_.isInstanceOf[RowToVeloxColumnarExec]).isDefined)
   }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index 53f44a2ccc..5958baa377 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -92,12 +92,9 @@ class VeloxHashJoinSuite extends 
VeloxWholeStageTransformerSuite {
 
       // The computing is combined into one single whole stage transformer.
       val wholeStages = plan.collect { case wst: WholeStageTransformer => wst }
-      if (SparkShimLoader.getSparkVersion.startsWith("3.2.")) {
+      if (isSparkVersionLE("3.2")) {
         assert(wholeStages.length == 1)
-      } else if (
-        SparkShimLoader.getSparkVersion.startsWith("3.5.") ||
-        SparkShimLoader.getSparkVersion.startsWith("4.0.")
-      ) {
+      } else if (isSparkVersionGE("3.5")) {
         assert(wholeStages.length == 5)
       } else {
         assert(wholeStages.length == 3)
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
index 06f0acb784..37f13bcea8 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala
@@ -544,7 +544,8 @@ class VeloxStringFunctionsSuite extends 
VeloxWholeStageTransformerSuite {
         s"from $LINEITEM_TABLE limit 5") { _ => }
   }
 
-  testWithMinSparkVersion("split", "3.4") {
+  // TODO: fix on spark-4.1
+  testWithSpecifiedSparkVersion("split", "3.4", "3.5") {
     runQueryAndCompare(
       s"select l_orderkey, l_comment, split(l_comment, '') " +
         s"from $LINEITEM_TABLE limit 5") {
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
index 52a17995f3..f8e2554da7 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
@@ -39,7 +39,8 @@ class ArrowEvalPythonExecSuite extends 
WholeStageTransformerSuite {
       .set("spark.executor.cores", "1")
   }
 
-  test("arrow_udf test: without projection") {
+  // TODO: fix on spark-4.1
+  testWithMaxSparkVersion("arrow_udf test: without projection", "4.0") {
     lazy val base =
       Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 
1), ("3", 0))
         .toDF("a", "b")
@@ -59,7 +60,8 @@ class ArrowEvalPythonExecSuite extends 
WholeStageTransformerSuite {
     checkAnswer(df2, expected)
   }
 
-  test("arrow_udf test: with unrelated projection") {
+  // TODO: fix on spark-4.1
+  testWithMaxSparkVersion("arrow_udf test: with unrelated projection", "4.0") {
     lazy val base =
       Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 
1), ("3", 0))
         .toDF("a", "b")
diff --git a/dev/format-scala-code.sh b/dev/format-scala-code.sh
index 96a782405b..4d2fba5ca1 100755
--- a/dev/format-scala-code.sh
+++ b/dev/format-scala-code.sh
@@ -22,7 +22,7 @@ MVN_CMD="${BASEDIR}/../build/mvn"
 # If a new profile is introduced for new modules, please add it here to ensure
 # the new modules are covered.
 PROFILES="-Pbackends-velox -Pceleborn,uniffle -Piceberg,delta,hudi,paimon \
-          -Pspark-3.2,spark-3.3,spark-3.4,spark-3.5,spark-4.0 -Pspark-ut"
+          -Pspark-3.2,spark-3.3,spark-3.4,spark-3.5,spark-4.0,spark-4.1 
-Pspark-ut"
 
 COMMAND=$1
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
index 732c898285..7dfaaaa774 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.extension.caller
 
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.util.SparkVersionUtil
 
 /**
  * Helper API that stores information about the call site of the columnar 
rule. Specific columnar
@@ -70,7 +70,12 @@ object CallerInfo {
   }
 
   private def inStreamingCall(stack: Seq[StackTraceElement]): Boolean = {
-    
stack.exists(_.getClassName.equals(StreamExecution.getClass.getName.split('$').head))
+    val streamName = if (SparkVersionUtil.gteSpark41) {
+      "org.apache.spark.sql.execution.streaming.runtime.StreamExecution"
+    } else {
+      "org.apache.spark.sql.execution.streaming.StreamExecution"
+    }
+    stack.exists(_.getClassName.equals(streamName))
   }
 
   private def inBloomFilterStatFunctionCall(stack: Seq[StackTraceElement]): 
Boolean = {
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
index efa0c63dca..50114ab702 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
@@ -25,6 +25,7 @@ object SparkVersionUtil {
   val gteSpark33: Boolean = comparedWithSpark33 >= 0
   val gteSpark35: Boolean = comparedWithSpark35 >= 0
   val gteSpark40: Boolean = compareMajorMinorVersion((4, 0)) >= 0
+  val gteSpark41: Boolean = compareMajorMinorVersion((4, 1)) >= 0
 
   // Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one > 
other.
   def compareMajorMinorVersion(other: (Int, Int)): Int = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index 52f6d31d1d..418de8578f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -856,13 +856,6 @@ object ExpressionConverter extends SQLConfHelper with 
Logging {
           dateAdd.children,
           dateAdd
         )
-      case timeAdd: TimeAdd =>
-        BackendsApiManager.getSparkPlanExecApiInstance.genDateAddTransformer(
-          attributeSeq,
-          substraitExprName,
-          timeAdd.children,
-          timeAdd
-        )
       case ss: StringSplit =>
         
BackendsApiManager.getSparkPlanExecApiInstance.genStringSplitTransformer(
           substraitExprName,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index b0b7c80793..b13aced2a6 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -179,7 +179,6 @@ object ExpressionMappings {
     Sig[Second](EXTRACT),
     Sig[FromUnixTime](FROM_UNIXTIME),
     Sig[DateAdd](DATE_ADD),
-    Sig[TimeAdd](TIMESTAMP_ADD),
     Sig[DateSub](DATE_SUB),
     Sig[DateDiff](DATE_DIFF),
     Sig[ToUnixTimestamp](TO_UNIX_TIMESTAMP),
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index 474a271769..7267ce56ba 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -18,8 +18,8 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
+import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.utils.PlanUtil
-
 import org.apache.spark.sql.{Column, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
@@ -130,10 +130,8 @@ object GlutenImplicits {
           val (innerNumGlutenNodes, innerFallbackNodeToReason) =
             withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
               // re-plan manually to skip cached data
-              val newSparkPlan = QueryExecution.createSparkPlan(
-                spark,
-                spark.sessionState.planner,
-                p.inputPlan.logicalLink.get)
+              val newSparkPlan = SparkShimLoader.getSparkShims.createSparkPlan(
+                spark, spark.sessionState.planner, p.inputPlan.logicalLink.get)
               val newExecutedPlan = QueryExecution.prepareExecutedPlan(
                 spark,
                 newSparkPlan
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
index 5cf41b7a9e..570b6d5e0c 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
@@ -42,7 +42,7 @@ class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite 
with GlutenSQLTest
   import testImplicits._
 
   private def readRowGroupRowCounts(path: String): Seq[Long] = {
-    ParquetFooterReader
+    ParquetFooterReaderShim
       .readFooter(
         spark.sessionState.newHadoopConf(),
         new Path(path),
diff --git a/pom.xml b/pom.xml
index c87d46ceb5..b88fa8dfee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -938,7 +938,7 @@
     <profile>
       <id>scala-2.13</id>
       <properties>
-        <scala.version>2.13.16</scala.version>
+        <scala.version>2.13.17</scala.version>
         <scala.binary.version>2.13</scala.binary.version>
         <spotless.scalafmt.version>3.8.3</spotless.scalafmt.version>
       </properties>
@@ -1270,6 +1270,86 @@
         </plugins>
       </build>
     </profile>
+    <profile>
+      <id>spark-4.1</id>
+      <properties>
+        <sparkbundle.version>4.1</sparkbundle.version>
+        
<sparkshim.artifactId>spark-sql-columnar-shims-spark41</sparkshim.artifactId>
+        <spark.version>4.1.0</spark.version>
+        <iceberg.version>1.10.0</iceberg.version>
+        <delta.package.name>delta-spark</delta.package.name>
+        <delta.version>4.0.0</delta.version>
+        <delta.binary.version>40</delta.binary.version>
+        <hudi.version>1.1.0</hudi.version>
+        <paimon.version>1.3.0</paimon.version>
+        <fasterxml.version>2.18.2</fasterxml.version>
+        <fasterxml.jackson.version>2.18.2</fasterxml.jackson.version>
+        
<fasterxml.jackson.databind.version>2.18.2</fasterxml.jackson.databind.version>
+        <hadoop.version>3.4.1</hadoop.version>
+        <antlr4.version>4.13.1</antlr4.version>
+        <guava.version>33.4.0-jre</guava.version>
+        <slf4j.version>2.0.16</slf4j.version>
+        <log4j.version>2.24.3</log4j.version>
+        <commons-lang3.version>3.17.0</commons-lang3.version>
+        <arrow.version>18.1.0</arrow.version>
+        <arrow-gluten.version>18.1.0</arrow-gluten.version>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+          <version>${slf4j.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j2-impl</artifactId>
+          <version>${log4j.version}</version>
+          <scope>provided</scope>
+        </dependency>
+      </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>enforce-java-17+</id>
+                <goals>
+                  <goal>enforce</goal>
+                </goals>
+                <configuration>
+                  <rules>
+                    <requireActiveProfile>
+                      <!-- Spark 4.1 requires Java 17+ -->
+                      <profiles>java-17,java-21</profiles>
+                      <all>false</all>
+                      <message>"-P spark-4.1" requires JDK 17+</message>
+                    </requireActiveProfile>
+                  </rules>
+                </configuration>
+              </execution>
+              <execution>
+                <id>enforce-scala-213</id>
+                <goals>
+                  <goal>enforce</goal>
+                </goals>
+                <configuration>
+                  <rules>
+                    <requireActiveProfile>
+                      <!-- Spark 4.1 requires Scala 2.13 -->
+                      <profiles>scala-2.13</profiles>
+                      <message>"-P spark-4.1" requires Scala 2.13</message>
+                    </requireActiveProfile>
+                  </rules>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
     <profile>
       <id>hadoop-2.7.4</id>
       <properties>
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index de220cab82..9164f4b7c4 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -355,4 +355,14 @@ trait SparkShims {
   def unsupportedCodec: Seq[CompressionCodecName] = {
     Seq(CompressionCodecName.LZO, CompressionCodecName.BROTLI)
   }
+
+  /**
+   * Shim layer for QueryExecution to maintain compatibility across different 
Spark versions.
+   * @since Spark
+   *   4.1
+   */
+  def createSparkPlan(
+      sparkSession: SparkSession,
+      planner: SparkPlanner,
+      plan: LogicalPlan): SparkPlan
 }
diff --git a/shims/pom.xml b/shims/pom.xml
index 9a8e639e04..8c10ce640f 100644
--- a/shims/pom.xml
+++ b/shims/pom.xml
@@ -98,6 +98,12 @@
         <module>spark40</module>
       </modules>
     </profile>
+    <profile>
+      <id>spark-4.1</id>
+      <modules>
+        <module>spark41</module>
+      </modules>
+    </profile>
     <profile>
       <id>default</id>
       <activation>
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 367b66f9c4..d1bf07e64b 100644
--- 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.{FileSourceScanExec, 
PartitionedFileUtil, SparkPlan}
+import org.apache.spark.sql.execution.{FileSourceScanExec, 
PartitionedFileUtil, QueryExecution, SparkPlan, SparkPlanner}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -303,4 +303,16 @@ class Spark32Shims extends SparkShims {
   override def getErrorMessage(raiseError: RaiseError): Option[Expression] = {
     Some(raiseError.child)
   }
+
+  /**
+   * Shim layer for QueryExecution to maintain compatibility across different 
Spark versions.
+   *
+   * @since Spark
+   *   4.1
+   */
+  override def createSparkPlan(
+      sparkSession: SparkSession,
+      planner: SparkPlanner,
+      plan: LogicalPlan): SparkPlan =
+    QueryExecution.createSparkPlan(sparkSession, planner, plan)
 }
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
new file mode 100644
index 0000000000..b1419e5e62
--- /dev/null
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
+
+/** Shim layer for ParquetFooterReader to maintain compatibility across 
different Spark versions. */
+object ParquetFooterReaderShim {
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      fileStatus: FileStatus,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(configuration, fileStatus, filter)
+  }
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      file: Path,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(configuration, file, filter)
+  }
+}
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 2b6affcded..a18fb31719 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TimestampFormatte
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.{FileSourceScanExec, 
PartitionedFileUtil, SparkPlan}
+import org.apache.spark.sql.execution.{FileSourceScanExec, 
PartitionedFileUtil, QueryExecution, SparkPlan, SparkPlanner}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -408,4 +408,16 @@ class Spark33Shims extends SparkShims {
   override def getErrorMessage(raiseError: RaiseError): Option[Expression] = {
     Some(raiseError.child)
   }
+
+  /**
+   * Shim layer for QueryExecution to maintain compatibility across different 
Spark versions.
+   *
+   * @since Spark
+   *   4.1
+   */
+  override def createSparkPlan(
+      sparkSession: SparkSession,
+      planner: SparkPlanner,
+      plan: LogicalPlan): SparkPlan =
+    QueryExecution.createSparkPlan(sparkSession, planner, plan)
 }
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
new file mode 100644
index 0000000000..b1419e5e62
--- /dev/null
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
+
+/** Shim layer for ParquetFooterReader to maintain compatibility across 
different Spark versions. */
+object ParquetFooterReaderShim {
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      fileStatus: FileStatus,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(configuration, fileStatus, filter)
+  }
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      file: Path,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(configuration, file, filter)
+  }
+}
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 39dd71a6bc..cdbeaa4783 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -651,4 +651,16 @@ class Spark34Shims extends SparkShims {
   override def getErrorMessage(raiseError: RaiseError): Option[Expression] = {
     Some(raiseError.child)
   }
+
+  /**
+   * Shim layer for QueryExecution to maintain compatibility across different 
Spark versions.
+   *
+   * @since Spark
+   *   4.1
+   */
+  override def createSparkPlan(
+      sparkSession: SparkSession,
+      planner: SparkPlanner,
+      plan: LogicalPlan): SparkPlan =
+    QueryExecution.createSparkPlan(sparkSession, planner, plan)
 }
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
new file mode 100644
index 0000000000..b1419e5e62
--- /dev/null
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
+
+/** Shim layer for ParquetFooterReader to maintain compatibility across 
different Spark versions. */
+object ParquetFooterReaderShim {
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      fileStatus: FileStatus,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(configuration, fileStatus, filter)
+  }
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      file: Path,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(configuration, file, filter)
+  }
+}
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 78da08190f..d993cc0bfd 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -702,4 +702,16 @@ class Spark35Shims extends SparkShims {
   override def getErrorMessage(raiseError: RaiseError): Option[Expression] = {
     Some(raiseError.child)
   }
+
+  /**
+   * Shim layer for QueryExecution to maintain compatibility across different 
Spark versions.
+   *
+   * @since Spark
+   *   4.1
+   */
+  override def createSparkPlan(
+      sparkSession: SparkSession,
+      planner: SparkPlanner,
+      plan: LogicalPlan): SparkPlan =
+    QueryExecution.createSparkPlan(sparkSession, planner, plan)
 }
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
new file mode 100644
index 0000000000..b1419e5e62
--- /dev/null
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
+
+/** Shim layer for ParquetFooterReader to maintain compatibility across 
different Spark versions. */
+object ParquetFooterReaderShim {
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      fileStatus: FileStatus,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(configuration, fileStatus, filter)
+  }
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      file: Path,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(configuration, file, filter)
+  }
+}
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 80c22f2fad..e5258eafa4 100644
--- 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
InternalRowComparableWrapper, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
@@ -752,4 +753,16 @@ class Spark40Shims extends SparkShims {
   override def unsupportedCodec: Seq[CompressionCodecName] = {
     Seq(CompressionCodecName.LZO, CompressionCodecName.BROTLI, 
CompressionCodecName.LZ4_RAW)
   }
+
+  /**
+   * Shim layer for QueryExecution to maintain compatibility across different 
Spark versions.
+   *
+   * @since Spark
+   *   4.1
+   */
+  override def createSparkPlan(
+      sparkSession: SparkSession,
+      planner: SparkPlanner,
+      plan: LogicalPlan): SparkPlan =
+    QueryExecution.createSparkPlan(sparkSession, planner, plan)
 }
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
new file mode 100644
index 0000000000..b1419e5e62
--- /dev/null
+++ 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
+
+/** Shim layer for ParquetFooterReader to maintain compatibility across 
different Spark versions. */
+object ParquetFooterReaderShim {
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      fileStatus: FileStatus,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(configuration, fileStatus, filter)
+  }
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      file: Path,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(configuration, file, filter)
+  }
+}
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala
 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala
deleted file mode 100644
index 98fcfa5483..0000000000
--- 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources.v2
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
SortOrder}
-import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory, Scan}
-
-class Spark35Scan extends DataSourceV2ScanExecBase {
-
-  override def scan: Scan = throw new 
UnsupportedOperationException("Spark35Scan")
-
-  override def ordering: Option[Seq[SortOrder]] = throw new 
UnsupportedOperationException(
-    "Spark35Scan")
-
-  override def readerFactory: PartitionReaderFactory =
-    throw new UnsupportedOperationException("Spark35Scan")
-
-  override def keyGroupedPartitioning: Option[Seq[Expression]] =
-    throw new UnsupportedOperationException("Spark35Scan")
-
-  override protected def inputPartitions: Seq[InputPartition] =
-    throw new UnsupportedOperationException("Spark35Scan")
-
-  override def inputRDD: RDD[InternalRow] = throw new 
UnsupportedOperationException("Spark35Scan")
-
-  override def output: Seq[Attribute] = throw new 
UnsupportedOperationException("Spark35Scan")
-
-  override def productElement(n: Int): Any = throw new 
UnsupportedOperationException("Spark35Scan")
-
-  override def productArity: Int = throw new 
UnsupportedOperationException("Spark35Scan")
-
-  override def canEqual(that: Any): Boolean = throw new 
UnsupportedOperationException("Spark35Scan")
-
-}
diff --git a/shims/spark41/pom.xml b/shims/spark41/pom.xml
index c2433e8bdd..3d39f1a932 100644
--- a/shims/spark41/pom.xml
+++ b/shims/spark41/pom.xml
@@ -22,9 +22,9 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
 
-  <artifactId>spark-sql-columnar-shims-spark40</artifactId>
+  <artifactId>spark-sql-columnar-shims-spark41</artifactId>
   <packaging>jar</packaging>
-  <name>Gluten Shims for Spark 4.0</name>
+  <name>Gluten Shims for Spark 4.1</name>
 
   <dependencies>
     <dependency>
diff --git 
a/shims/spark41/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArrayShim.java
 
b/shims/spark41/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArrayShim.java
index 25adf5d233..7d1347345a 100644
--- 
a/shims/spark41/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArrayShim.java
+++ 
b/shims/spark41/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArrayShim.java
@@ -27,6 +27,8 @@ import org.apache.spark.sql.vectorized.ColumnarArray;
 import org.apache.spark.sql.vectorized.ColumnarMap;
 import org.apache.spark.sql.vectorized.ColumnarRow;
 import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.GeographyVal;
+import org.apache.spark.unsafe.types.GeometryVal;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.apache.spark.unsafe.types.VariantVal;
 
@@ -199,6 +201,16 @@ public class ColumnarArrayShim extends ArrayData {
     return data.getBinary(offset + ordinal);
   }
 
+  @Override
+  public GeographyVal getGeography(int ordinal) {
+    return data.getGeography(offset + ordinal);
+  }
+
+  @Override
+  public GeometryVal getGeometry(int ordinal) {
+    return data.getGeometry(offset + ordinal);
+  }
+
   @Override
   public CalendarInterval getInterval(int ordinal) {
     return data.getInterval(offset + ordinal);
diff --git 
a/shims/spark41/src/main/resources/META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider
 
b/shims/spark41/src/main/resources/META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider
index b8136ed835..6c2e7e5181 100644
--- 
a/shims/spark41/src/main/resources/META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider
+++ 
b/shims/spark41/src/main/resources/META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider
@@ -1 +1 @@
-org.apache.gluten.sql.shims.spark40.SparkShimProvider
\ No newline at end of file
+org.apache.gluten.sql.shims.spark41.SparkShimProvider
\ No newline at end of file
diff --git 
a/shims/spark41/src/main/scala/org/apache/gluten/execution/GenerateTreeStringShim.scala
 
b/shims/spark41/src/main/scala/org/apache/gluten/execution/GenerateTreeStringShim.scala
index 7628b210f1..3d329a8f31 100644
--- 
a/shims/spark41/src/main/scala/org/apache/gluten/execution/GenerateTreeStringShim.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/gluten/execution/GenerateTreeStringShim.scala
@@ -42,6 +42,7 @@ trait WholeStageTransformerGenerateTreeStringShim extends 
UnaryExecNode {
       addSuffix: Boolean = false,
       maxFields: Int,
       printNodeId: Boolean,
+      printOutputColumns: Boolean,
       indent: Int = 0): Unit = {
     val prefix = if (printNodeId) "^ " else s"^($stageId) "
     child.generateTreeString(
@@ -53,7 +54,8 @@ trait WholeStageTransformerGenerateTreeStringShim extends 
UnaryExecNode {
       addSuffix = false,
       maxFields,
       printNodeId = printNodeId,
-      indent)
+      printOutputColumns = printOutputColumns,
+      indent = indent)
 
     if (verbose && wholeStageTransformerContextDefined) {
       append(prefix + "Substrait plan:\n")
@@ -74,6 +76,7 @@ trait InputAdapterGenerateTreeStringShim extends 
UnaryExecNode {
       addSuffix: Boolean = false,
       maxFields: Int,
       printNodeId: Boolean,
+      printOutputColumns: Boolean,
       indent: Int = 0): Unit = {
     child.generateTreeString(
       depth,
@@ -84,6 +87,7 @@ trait InputAdapterGenerateTreeStringShim extends 
UnaryExecNode {
       addSuffix = false,
       maxFields,
       printNodeId,
+      printOutputColumns,
       indent)
   }
 }
diff --git 
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
similarity index 98%
rename from 
shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
rename to 
shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
index 80c22f2fad..ea5f733614 100644
--- 
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.sql.shims.spark40
+package org.apache.gluten.sql.shims.spark41
 
 import org.apache.gluten.execution.PartitionedFileUtilShim
 import org.apache.gluten.expression.{ExpressionNames, Sig}
@@ -69,7 +69,7 @@ import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
-class Spark40Shims extends SparkShims {
+class Spark41Shims extends SparkShims {
 
   override def getDistribution(
       leftKeys: Seq[Expression],
@@ -752,4 +752,16 @@ class Spark40Shims extends SparkShims {
   override def unsupportedCodec: Seq[CompressionCodecName] = {
     Seq(CompressionCodecName.LZO, CompressionCodecName.BROTLI, 
CompressionCodecName.LZ4_RAW)
   }
+
+  /**
+   * Shim layer for QueryExecution to maintain compatibility across different 
Spark versions.
+   *
+   * @since Spark
+   *   4.1
+   */
+  override def createSparkPlan(
+      sparkSession: SparkSession,
+      planner: SparkPlanner,
+      plan: LogicalPlan): SparkPlan =
+    QueryExecution.createSparkPlan(planner, plan)
 }
diff --git 
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/SparkShimProvider.scala
 
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/SparkShimProvider.scala
similarity index 93%
rename from 
shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/SparkShimProvider.scala
rename to 
shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/SparkShimProvider.scala
index 7acf6589d4..9a5b6a6319 100644
--- 
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/SparkShimProvider.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/SparkShimProvider.scala
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.sql.shims.spark40
+package org.apache.gluten.sql.shims.spark41
 
 import org.apache.gluten.sql.shims.SparkShims
 
 class SparkShimProvider extends org.apache.gluten.sql.shims.SparkShimProvider {
   def createShim: SparkShims = {
-    new Spark40Shims()
+    new Spark41Shims()
   }
 }
diff --git 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 2dd8ff3867..d9a50f65c7 100644
--- 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -21,7 +21,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.Partition
 import org.apache.spark.internal.LogKeys.{COUNT, MAX_SPLIT_BYTES, 
OPEN_COST_IN_BYTES}
-import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
diff --git 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
new file mode 100644
index 0000000000..f4cc013ad4
--- /dev/null
+++ 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
+import org.apache.parquet.hadoop.util.HadoopInputFile
+
+/** Shim layer for ParquetFooterReader to maintain compatibility across 
different Spark versions. */
+object ParquetFooterReaderShim {
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      fileStatus: FileStatus,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(HadoopInputFile.fromStatus(fileStatus, 
configuration), filter)
+  }
+
+  /** @since Spark 4.1 */
+  def readFooter(
+      configuration: Configuration,
+      file: Path,
+      filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = {
+    ParquetFooterReader.readFooter(HadoopInputFile.fromPath(file, 
configuration), filter)
+  }
+}
diff --git 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
index 0e03fa0ffe..a6a46b10f4 100644
--- 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -24,6 +24,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, Key
 import org.apache.spark.sql.catalyst.util.{truncatedString, 
InternalRowComparableWrapper}
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.execution.joins.StoragePartitionJoinParams
 import org.apache.spark.util.ArrayImplicits._
 
 import com.google.common.base.Objects
diff --git 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 242241254e..046c42ea7b 100644
--- 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.connector.expressions.aggregate.Aggregation
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan, SupportsRuntimeV2Filtering}
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+import org.apache.spark.sql.execution.joins.StoragePartitionJoinParams
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.ArrayImplicits._
diff --git 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala
 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala
deleted file mode 100644
index 98fcfa5483..0000000000
--- 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources.v2
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
SortOrder}
-import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory, Scan}
-
-class Spark35Scan extends DataSourceV2ScanExecBase {
-
-  override def scan: Scan = throw new 
UnsupportedOperationException("Spark35Scan")
-
-  override def ordering: Option[Seq[SortOrder]] = throw new 
UnsupportedOperationException(
-    "Spark35Scan")
-
-  override def readerFactory: PartitionReaderFactory =
-    throw new UnsupportedOperationException("Spark35Scan")
-
-  override def keyGroupedPartitioning: Option[Seq[Expression]] =
-    throw new UnsupportedOperationException("Spark35Scan")
-
-  override protected def inputPartitions: Seq[InputPartition] =
-    throw new UnsupportedOperationException("Spark35Scan")
-
-  override def inputRDD: RDD[InternalRow] = throw new 
UnsupportedOperationException("Spark35Scan")
-
-  override def output: Seq[Attribute] = throw new 
UnsupportedOperationException("Spark35Scan")
-
-  override def productElement(n: Int): Any = throw new 
UnsupportedOperationException("Spark35Scan")
-
-  override def productArity: Int = throw new 
UnsupportedOperationException("Spark35Scan")
-
-  override def canEqual(that: Any): Boolean = throw new 
UnsupportedOperationException("Spark35Scan")
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to