This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch release-1.2
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit f8d89e67cb57d520f321107622293f09ce10789f
Author: Yann Byron <[email protected]>
AuthorDate: Thu Jun 12 13:53:56 2025 +0800

    [spark] Upgrade spark version to 4.0.0 (#5711)
    
    (cherry picked from commit 35539360c9fbb43bfb3fa469421112bf52d4ae75)
---
 .../paimon/spark/sql/PaimonOptimizationTest.scala  |  2 +-
 .../paimon/spark/sql/PaimonOptimizationTest.scala  |  2 +-
 .../paimon/spark/sql/PaimonOptimizationTest.scala  |  2 +-
 paimon-spark/paimon-spark-4.0/pom.xml              |  8 ++-
 .../paimon/spark/sql/PaimonOptimizationTest.scala  |  6 +-
 .../java/org/apache/paimon/spark/SparkCatalog.java | 12 ++--
 .../apache/paimon/spark/SparkGenericCatalog.java   |  5 +-
 .../paimon/spark/SparkInternalRowWrapper.java      |  4 +-
 .../java/org/apache/paimon/spark/SparkRow.java     |  4 +-
 .../paimon/spark/procedure/BaseProcedure.java      |  8 +--
 .../scala/org/apache/paimon/spark/ScanHelper.scala |  4 +-
 .../org/apache/paimon/spark/SparkSource.scala      |  4 +-
 .../org/apache/paimon/spark/SparkTypeUtils.java    |  8 +--
 .../analysis/expressions/ExpressionHelper.scala    |  4 +-
 .../optimizer/EvalSubqueriesForDeleteTable.scala   | 12 ++--
 .../MergePaimonScalarSubqueriesBase.scala          | 15 +++--
 .../spark/commands/UpdatePaimonTableCommand.scala  |  5 +-
 .../apache/paimon/spark/data/SparkArrayData.scala  |  2 +-
 .../paimon/spark/data/SparkInternalRow.scala       |  2 +-
 .../paimon/spark/execution/PaimonStrategy.scala    |  6 +-
 .../extensions/PaimonSparkSessionExtensions.scala  |  7 +--
 .../spark/procedure/SparkOrphanFilesClean.scala    |  4 +-
 .../procedure/SparkRemoveUnexistingFiles.scala     |  4 +-
 .../org/apache/spark/sql/PaimonStatsUtils.scala    |  3 +-
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |  5 +-
 .../AbstractPaimonSparkSqlExtensionsParser.scala   |  4 +-
 .../sql/connector/catalog/PaimonCatalogUtils.scala |  4 +-
 .../shim/PaimonCreateTableAsSelectStrategy.scala   |  6 +-
 .../spark/sql/paimon/PaimonSparkSession.scala}     | 14 +----
 .../{SparkShimLoader.scala => ClassicApi.scala}    | 36 ++++++------
 .../apache/spark/sql/paimon/shims/SparkShim.scala  | 17 +++---
 .../spark/sql/paimon/shims/SparkShimLoader.scala   |  2 +-
 .../paimon/spark/SparkCatalogWithHiveTest.java     | 10 ++--
 .../paimon/spark/SparkGenericCatalogTest.java      |  3 +-
 .../spark/SparkGenericCatalogWithHiveTest.java     |  5 +-
 .../spark/extensions/CallStatementParserTest.java  | 25 +++++----
 .../paimon/spark/procedure/ProcedureTestBase.scala | 10 +++-
 .../spark/sql/DDLWithHiveCatalogTestBase.scala     | 64 ++++++++++++----------
 .../spark/sql/PaimonOptimizationTestBase.scala     |  1 +
 .../paimon/spark/sql/SparkVersionSupport.scala     |  5 +-
 .../scala/org/apache/spark/sql/paimon/Utils.scala  |  3 +-
 .../spark/sql/paimon/shims/Classic3Api.scala       | 55 +++++++++++++++++++
 .../apache/spark/sql/paimon/shims/Spark3Shim.scala | 21 ++++---
 paimon-spark/paimon-spark4-common/pom.xml          |  4 ++
 .../PaimonSpark4SqlExtensionsParser.scala          |  5 +-
 .../spark/sql/paimon/shims/Classic4Api.scala       | 63 +++++++++++++++++++++
 .../apache/spark/sql/paimon/shims/Spark4Shim.scala | 26 +++++----
 pom.xml                                            |  4 +-
 48 files changed, 345 insertions(+), 180 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
index df15d166a6..692c1f6e92 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -30,7 +30,7 @@ class PaimonOptimizationTest extends 
PaimonOptimizationTestBase {
       fieldIndex: Int): NamedExpression = {
     GetStructField(
       ScalarSubquery(
-        SparkShimLoader.getSparkShim
+        SparkShimLoader.shim
           .createCTERelationRef(cteIndex, resolved = true, output.toSeq, 
isStreaming = false)),
       fieldIndex)
       .as("scalarsubquery()")
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
index d2a20d6df3..9bf2c684ec 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -31,7 +31,7 @@ class PaimonOptimizationTest extends 
PaimonOptimizationTestBase {
       fieldIndex: Int): NamedExpression = {
     GetStructField(
       ScalarSubquery(
-        SparkShimLoader.getSparkShim
+        SparkShimLoader.shim
           .createCTERelationRef(cteIndex, resolved = true, output.toSeq, 
isStreaming = false)),
       fieldIndex)
       .as("scalarsubquery()")
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
index df15d166a6..692c1f6e92 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -30,7 +30,7 @@ class PaimonOptimizationTest extends 
PaimonOptimizationTestBase {
       fieldIndex: Int): NamedExpression = {
     GetStructField(
       ScalarSubquery(
-        SparkShimLoader.getSparkShim
+        SparkShimLoader.shim
           .createCTERelationRef(cteIndex, resolved = true, output.toSeq, 
isStreaming = false)),
       fieldIndex)
       .as("scalarsubquery()")
diff --git a/paimon-spark/paimon-spark-4.0/pom.xml 
b/paimon-spark/paimon-spark-4.0/pom.xml
index 11ae457db5..20b5953138 100644
--- a/paimon-spark/paimon-spark-4.0/pom.xml
+++ b/paimon-spark/paimon-spark-4.0/pom.xml
@@ -32,7 +32,7 @@ under the License.
     <name>Paimon : Spark : 4.0</name>
 
     <properties>
-        <spark.version>4.0.0-preview2</spark.version>
+        <spark.version>4.0.0</spark.version>
     </properties>
 
     <dependencies>
@@ -81,6 +81,12 @@ under the License.
             <version>${spark.version}</version>
             <classifier>tests</classifier>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    
<artifactId>spark-connect-shims_${scala.binary.version}</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
index bbba0b0197..ec140a89bb 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.sql
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField, 
NamedExpression, ScalarSubquery}
 import org.apache.spark.sql.paimon.shims.SparkShimLoader
+
 class PaimonOptimizationTest extends PaimonOptimizationTestBase {
 
   override def extractorExpression(
@@ -29,9 +30,10 @@ class PaimonOptimizationTest extends 
PaimonOptimizationTestBase {
       fieldIndex: Int): NamedExpression = {
     GetStructField(
       ScalarSubquery(
-        SparkShimLoader.getSparkShim
+        SparkShimLoader.shim
           .createCTERelationRef(cteIndex, resolved = true, output.toSeq, 
isStreaming = false)),
-      fieldIndex)
+      fieldIndex,
+      None)
       .as("scalarsubquery()")
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index df03286fd0..695f03d9ca 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -38,6 +38,7 @@ import org.apache.paimon.table.FormatTableOptions;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.TypeUtils;
 
+import org.apache.spark.sql.PaimonSparkSession$;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
@@ -110,7 +111,7 @@ public class SparkCatalog extends SparkBaseCatalog
         CatalogContext catalogContext =
                 CatalogContext.create(
                         Options.fromMap(options),
-                        SparkSession.active().sessionState().newHadoopConf());
+                        
PaimonSparkSession$.MODULE$.active().sessionState().newHadoopConf());
         this.catalog = CatalogFactory.createCatalog(catalogContext);
         this.defaultDatabase =
                 options.getOrDefault(DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE.defaultValue());
@@ -475,6 +476,7 @@ public class SparkCatalog extends SparkBaseCatalog
     }
 
     private static FileTable convertToFileTable(Identifier ident, FormatTable 
formatTable) {
+        SparkSession spark = PaimonSparkSession$.MODULE$.active();
         StructType schema = 
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
         StructType partitionSchema =
                 SparkTypeUtils.fromPaimonRowType(
@@ -488,7 +490,7 @@ public class SparkCatalog extends SparkBaseCatalog
             dsOptions = new CaseInsensitiveStringMap(options.toMap());
             return new PartitionedCSVTable(
                     ident.name(),
-                    SparkSession.active(),
+                    spark,
                     dsOptions,
                     
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
                     scala.Option.apply(schema),
@@ -497,7 +499,7 @@ public class SparkCatalog extends SparkBaseCatalog
         } else if (formatTable.format() == FormatTable.Format.ORC) {
             return new PartitionedOrcTable(
                     ident.name(),
-                    SparkSession.active(),
+                    spark,
                     dsOptions,
                     
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
                     scala.Option.apply(schema),
@@ -506,7 +508,7 @@ public class SparkCatalog extends SparkBaseCatalog
         } else if (formatTable.format() == FormatTable.Format.PARQUET) {
             return new PartitionedParquetTable(
                     ident.name(),
-                    SparkSession.active(),
+                    spark,
                     dsOptions,
                     
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
                     scala.Option.apply(schema),
@@ -515,7 +517,7 @@ public class SparkCatalog extends SparkBaseCatalog
         } else if (formatTable.format() == FormatTable.Format.JSON) {
             return new PartitionedJsonTable(
                     ident.name(),
-                    SparkSession.active(),
+                    spark,
                     dsOptions,
                     
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
                     scala.Option.apply(schema),
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index ac1543f2fe..e4563c492f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -27,6 +27,7 @@ import org.apache.paimon.utils.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
+import org.apache.spark.sql.PaimonSparkSession$;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
@@ -202,7 +203,7 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
             return sparkCatalog.createTable(ident, schema, partitions, 
properties);
         } else {
             // delegate to the session catalog
-            return SparkShimLoader.getSparkShim()
+            return SparkShimLoader.shim()
                     .createTable(asTableCatalog(), ident, schema, partitions, 
properties);
         }
     }
@@ -238,7 +239,7 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
 
     @Override
     public final void initialize(String name, CaseInsensitiveStringMap 
options) {
-        SparkSession sparkSession = SparkSession.active();
+        SparkSession sparkSession = PaimonSparkSession$.MODULE$.active();
         SessionState sessionState = sparkSession.sessionState();
         Configuration hadoopConf = sessionState.newHadoopConf();
         if (options.containsKey(METASTORE.key())
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
index 9c9569c573..f42b1fa495 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
@@ -152,7 +152,7 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
 
     @Override
     public Variant getVariant(int pos) {
-        return SparkShimLoader.getSparkShim().toPaimonVariant(internalRow, 
pos);
+        return SparkShimLoader.shim().toPaimonVariant(internalRow, pos);
     }
 
     @Override
@@ -307,7 +307,7 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
 
         @Override
         public Variant getVariant(int pos) {
-            return SparkShimLoader.getSparkShim().toPaimonVariant(arrayData, 
pos);
+            return SparkShimLoader.shim().toPaimonVariant(arrayData, pos);
         }
 
         @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index 7d0d8ceb22..0fb3e2bdb3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -147,7 +147,7 @@ public class SparkRow implements InternalRow, Serializable {
 
     @Override
     public Variant getVariant(int i) {
-        return SparkShimLoader.getSparkShim().toPaimonVariant(row.getAs(i));
+        return SparkShimLoader.shim().toPaimonVariant(row.getAs(i));
     }
 
     @Override
@@ -309,7 +309,7 @@ public class SparkRow implements InternalRow, Serializable {
 
         @Override
         public Variant getVariant(int i) {
-            return SparkShimLoader.getSparkShim().toPaimonVariant(getAs(i));
+            return SparkShimLoader.shim().toPaimonVariant(getAs(i));
         }
 
         @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
index fe9d01971d..2bf38c9fc7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
@@ -22,6 +22,7 @@ import org.apache.paimon.spark.SparkTable;
 import org.apache.paimon.spark.SparkUtils;
 import org.apache.paimon.utils.Preconditions;
 
+import org.apache.spark.sql.PaimonSparkSession$;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -30,8 +31,8 @@ import org.apache.spark.sql.connector.catalog.CatalogPlugin;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.execution.CacheManager;
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.paimon.shims.SparkShimLoader;
 
 import java.util.function.Function;
 
@@ -44,7 +45,7 @@ abstract class BaseProcedure implements Procedure {
     private final TableCatalog tableCatalog;
 
     protected BaseProcedure(TableCatalog tableCatalog) {
-        this.spark = SparkSession.active();
+        this.spark = PaimonSparkSession$.MODULE$.active();
         this.tableCatalog = tableCatalog;
     }
 
@@ -114,10 +115,9 @@ abstract class BaseProcedure implements Procedure {
     }
 
     protected void refreshSparkCache(Identifier ident, Table table) {
-        CacheManager cacheManager = spark.sharedState().cacheManager();
         DataSourceV2Relation relation =
                 DataSourceV2Relation.create(table, Option.apply(tableCatalog), 
Option.apply(ident));
-        cacheManager.recacheByPlan(spark, relation);
+        SparkShimLoader.shim().classicApi().recacheByPlan(spark, relation);
     }
 
     protected InternalRow newInternalRow(Object... values) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index e68407903d..0d3282b621 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -23,14 +23,14 @@ import org.apache.paimon.io.DataFileMeta
 import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{PaimonSparkSession, SparkSession}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 trait ScanHelper extends Logging {
 
-  private val spark = SparkSession.active
+  private val spark = PaimonSparkSession.active
 
   val coreOptions: CoreOptions
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index cdcd2a1668..979191cfab 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -29,7 +29,7 @@ import org.apache.paimon.table.{DataTable, FileStoreTable, 
FileStoreTableFactory
 import org.apache.paimon.table.FormatTable.Format
 import org.apache.paimon.table.system.AuditLogTable
 
-import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode, 
SparkSession, SQLContext}
+import org.apache.spark.sql.{DataFrame, PaimonSparkSession, SaveMode => 
SparkSaveMode, SparkSession, SQLContext}
 import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.streaming.Sink
@@ -90,7 +90,7 @@ class SparkSource
           options,
           extractCatalogName().getOrElse(NAME),
           Identifier.create(CatalogUtils.database(path), 
CatalogUtils.table(path)))),
-      SparkSession.active.sessionState.newHadoopConf()
+      PaimonSparkSession.active.sessionState.newHadoopConf()
     )
     val table = FileStoreTableFactory.create(catalogContext)
     if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
similarity index 99%
rename from 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
index f72924edce..ae95881621 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
@@ -221,7 +221,7 @@ public class SparkTypeUtils {
 
         @Override
         public DataType visit(VariantType variantType) {
-            return SparkShimLoader.getSparkShim().SparkVariantType();
+            return SparkShimLoader.shim().SparkVariantType();
         }
 
         @Override
@@ -365,12 +365,12 @@ public class SparkTypeUtils {
                 return new FloatType();
             } else if (atomic instanceof 
org.apache.spark.sql.types.DoubleType) {
                 return new DoubleType();
-            } else if (atomic instanceof 
org.apache.spark.sql.types.StringType) {
-                return new VarCharType(VarCharType.MAX_LENGTH);
             } else if (atomic instanceof 
org.apache.spark.sql.types.VarcharType) {
                 return new 
VarCharType(((org.apache.spark.sql.types.VarcharType) atomic).length());
             } else if (atomic instanceof org.apache.spark.sql.types.CharType) {
                 return new CharType(((org.apache.spark.sql.types.CharType) 
atomic).length());
+            } else if (atomic instanceof 
org.apache.spark.sql.types.StringType) {
+                return new VarCharType(VarCharType.MAX_LENGTH);
             } else if (atomic instanceof org.apache.spark.sql.types.DateType) {
                 return new DateType();
             } else if (atomic instanceof 
org.apache.spark.sql.types.TimestampType) {
@@ -388,7 +388,7 @@ public class SparkTypeUtils {
             } else if (atomic instanceof 
org.apache.spark.sql.types.TimestampNTZType) {
                 // Move TimestampNTZType to the end for compatibility with 
spark3.3 and below
                 return new TimestampType();
-            } else if 
(SparkShimLoader.getSparkShim().isSparkVariantType(atomic)) {
+            } else if (SparkShimLoader.shim().isSparkVariantType(atomic)) {
                 return new VariantType();
             }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 682cf88fcf..5540f58e0e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -108,11 +108,11 @@ trait ExpressionHelperBase extends PredicateHelper {
   }
 
   def toColumn(expr: Expression): Column = {
-    SparkShimLoader.getSparkShim.column(expr)
+    SparkShimLoader.shim.classicApi.column(expr)
   }
 
   def toExpression(spark: SparkSession, col: Column): Expression = {
-    SparkShimLoader.getSparkShim.convertToExpression(spark, col)
+    SparkShimLoader.shim.classicApi.expression(spark, col)
   }
 
   protected def resolveExpression(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
index 4cf9284f97..66f8a10f37 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
@@ -22,12 +22,13 @@ import 
org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{execution, SparkSession}
+import org.apache.spark.sql.{execution, PaimonSparkSession, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Expression, In, InSubquery, 
Literal, ScalarSubquery, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
 import org.apache.spark.sql.types.BooleanType
 
 import scala.collection.JavaConverters._
@@ -42,7 +43,7 @@ import scala.collection.JavaConverters._
  */
 object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with 
ExpressionHelper with Logging {
 
-  lazy val spark: SparkSession = SparkSession.active
+  lazy val spark: SparkSession = PaimonSparkSession.active
   lazy val resolver: Resolver = spark.sessionState.conf.resolver
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
@@ -75,7 +76,8 @@ object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] 
with ExpressionHel
           throw new RuntimeException("Correlated InSubquery is not supported")
         }
 
-        val executedPlan = QueryExecution.prepareExecutedPlan(spark, 
listQuery.plan)
+        val executedPlan =
+          SparkShimLoader.shim.classicApi.prepareExecutedPlan(spark, 
listQuery.plan)
         val physicalSubquery = execution.InSubqueryExec(
           expr,
           execution.SubqueryExec(s"subquery#${listQuery.exprId.id}", 
executedPlan),
@@ -83,7 +85,7 @@ object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] 
with ExpressionHel
         evalPhysicalSubquery(physicalSubquery)
 
         physicalSubquery.values() match {
-          case Some(l) if l.length > 0 => In(expr, l.map(Literal(_, 
expr.dataType)))
+          case Some(l) if l.length > 0 => In(expr, l.map(Literal(_, 
expr.dataType)).toSeq)
           case _ => Literal(false, BooleanType)
         }
 
@@ -92,7 +94,7 @@ object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] 
with ExpressionHel
           throw new RuntimeException("Correlated ScalarSubquery is not 
supported")
         }
 
-        val executedPlan = QueryExecution.prepareExecutedPlan(spark, s.plan)
+        val executedPlan = 
SparkShimLoader.shim.classicApi.prepareExecutedPlan(spark, s.plan)
         val physicalSubquery = execution.ScalarSubquery(
           execution.SubqueryExec
             .createForScalarSubquery(s"scalar-subquery#${s.exprId.id}", 
executedPlan),
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala
index 95ee8e86b3..e42ac1cc42 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala
@@ -282,7 +282,7 @@ trait MergePaimonScalarSubqueriesBase extends 
Rule[LogicalPlan] with PredicateHe
         Some(scan2)
       } else {
         val mergedRequiredSchema = StructType(
-          (scan2.requiredSchema.fields.toSet ++ 
scan1.requiredSchema.fields.toSet).toSeq)
+          (scan2.requiredSchema.fields.toSet ++ 
scan1.requiredSchema.fields.toSet).toArray)
         Some(scan2.copy(requiredSchema = mergedRequiredSchema))
       }
     } else {
@@ -334,7 +334,7 @@ trait MergePaimonScalarSubqueriesBase extends 
Rule[LogicalPlan] with PredicateHe
 
   // Only allow aggregates of the same implementation because merging 
different implementations
   // could cause performance regression.
-  private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: 
Aggregate) = {
+  private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: 
Aggregate): Boolean = {
     val aggregateExpressionsSeq = Seq(newPlan, cachedPlan).map {
       plan => plan.aggregateExpressions.flatMap(_.collect { case a: 
AggregateExpression => a })
     }
@@ -343,7 +343,7 @@ trait MergePaimonScalarSubqueriesBase extends 
Rule[LogicalPlan] with PredicateHe
     val Seq(newPlanSupportsHashAggregate, cachedPlanSupportsHashAggregate) =
       aggregateExpressionsSeq.zip(groupByExpressionSeq).map {
         case (aggregateExpressions, groupByExpressions) =>
-          SparkShimLoader.getSparkShim.supportsHashAggregate(
+          SparkShimLoader.shim.supportsHashAggregate(
             
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes),
             groupByExpressions)
       }
@@ -351,8 +351,11 @@ trait MergePaimonScalarSubqueriesBase extends 
Rule[LogicalPlan] with PredicateHe
     newPlanSupportsHashAggregate && cachedPlanSupportsHashAggregate ||
     newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate && {
       val Seq(newPlanSupportsObjectHashAggregate, 
cachedPlanSupportsObjectHashAggregate) =
-        aggregateExpressionsSeq.map(
-          aggregateExpressions => 
Aggregate.supportsObjectHashAggregate(aggregateExpressions))
+        aggregateExpressionsSeq.zip(groupByExpressionSeq).map {
+          case (aggregateExpressions, groupByExpressions: Seq[Expression]) =>
+            SparkShimLoader.shim
+              .supportsObjectHashAggregate(aggregateExpressions, 
groupByExpressions)
+        }
       newPlanSupportsObjectHashAggregate && 
cachedPlanSupportsObjectHashAggregate ||
       newPlanSupportsObjectHashAggregate == 
cachedPlanSupportsObjectHashAggregate
     }
@@ -371,7 +374,7 @@ trait MergePaimonScalarSubqueriesBase extends 
Rule[LogicalPlan] with PredicateHe
               val subqueryCTE = header.plan.asInstanceOf[CTERelationDef]
               GetStructField(
                 createScalarSubquery(
-                  SparkShimLoader.getSparkShim.createCTERelationRef(
+                  SparkShimLoader.shim.createCTERelationRef(
                     subqueryCTE.id,
                     resolved = true,
                     subqueryCTE.output,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 74c7e122cd..4c132aae9f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -33,7 +33,6 @@ import 
org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, 
Project, SupportsSubquery}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.lit
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
 
 case class UpdatePaimonTableCommand(
     relation: DataSourceV2Relation,
@@ -132,7 +131,7 @@ case class UpdatePaimonTableCommand(
       touchedDataSplits: Array[DataSplit]): Seq[CommitMessage] = {
     val updateColumns = updateExpressions.zip(relation.output).map {
       case (update, origin) =>
-        SparkShimLoader.getSparkShim.column(update).as(origin.name, 
origin.metadata)
+        toColumn(update).as(origin.name, origin.metadata)
     }
 
     val toUpdateScanRelation = createNewRelation(touchedDataSplits, relation)
@@ -155,7 +154,7 @@ case class UpdatePaimonTableCommand(
         } else {
           If(condition, update, origin)
         }
-        SparkShimLoader.getSparkShim.column(updated).as(origin.name, 
origin.metadata)
+        toColumn(updated).as(origin.name, origin.metadata)
     }
 
     val data = createDataset(sparkSession, 
toUpdateScanRelation).select(updateColumns: _*)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala
index c6539a493c..790d273d0c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala
@@ -113,6 +113,6 @@ abstract class AbstractSparkArrayData extends 
SparkArrayData {
 
 object SparkArrayData {
   def create(elementType: PaimonDataType): SparkArrayData = {
-    SparkShimLoader.getSparkShim.createSparkArrayData(elementType)
+    SparkShimLoader.shim.createSparkArrayData(elementType)
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
index f3e607e9d7..b0916447c0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
@@ -30,7 +30,7 @@ abstract class SparkInternalRow extends InternalRow {
 object SparkInternalRow {
 
   def create(rowType: RowType): SparkInternalRow = {
-    SparkShimLoader.getSparkShim.createSparkInternalRow(rowType)
+    SparkShimLoader.shim.createSparkInternalRow(rowType)
   }
 
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index f2bfc7846b..ca4c356fa5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -23,19 +23,19 @@ import org.apache.paimon.spark.catalog.SupportView
 import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
 import 
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, 
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, 
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand}
 
-import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
DescribeRelation, LogicalPlan, ShowCreateTable}
 import org.apache.spark.sql.connector.catalog.{Identifier, 
PaimonLookupCatalog, TableCatalog}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy
 
 import scala.collection.JavaConverters._
 
 case class PaimonStrategy(spark: SparkSession)
-  extends Strategy
+  extends SparkStrategy
   with PredicateHelper
   with PaimonLookupCatalog {
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 4ecff93ea6..c68c91ca45 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -33,16 +33,13 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
 
   override def apply(extensions: SparkSessionExtensions): Unit = {
     // parser extensions
-    extensions.injectParser {
-      case (_, parser) => 
SparkShimLoader.getSparkShim.createSparkParser(parser)
-    }
+    extensions.injectParser { case (_, parser) => 
SparkShimLoader.shim.createSparkParser(parser) }
 
     // analyzer extensions
     extensions.injectResolutionRule(spark => new PaimonAnalysis(spark))
     extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark))
     extensions.injectResolutionRule(spark => PaimonViewResolver(spark))
-    extensions.injectResolutionRule(
-      spark => SparkShimLoader.getSparkShim.createCustomResolution(spark))
+    extensions.injectResolutionRule(spark => 
SparkShimLoader.shim.createCustomResolution(spark))
     extensions.injectResolutionRule(spark => 
PaimonIncompatibleResolutionRules(spark))
 
     extensions.injectPostHocResolutionRule(spark => 
ReplacePaimonFunctions(spark))
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
index 010a3e4ede..9e435d8b44 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
@@ -29,7 +29,7 @@ import 
org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX
 import org.apache.paimon.utils.SerializableConsumer
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{functions, Dataset, SparkSession}
+import org.apache.spark.sql.{functions, Dataset, PaimonSparkSession, 
SparkSession}
 import org.apache.spark.sql.catalyst.SQLConfHelper
 
 import java.util
@@ -199,7 +199,7 @@ object SparkOrphanFilesClean extends SQLConfHelper {
       olderThanMillis: Long,
       parallelismOpt: Integer,
       dryRun: Boolean): CleanOrphanFilesResult = {
-    val spark = SparkSession.active
+    val spark = PaimonSparkSession.active
     val parallelism = if (parallelismOpt == null) {
       Math.max(spark.sparkContext.defaultParallelism, 
conf.numShufflePartitions)
     } else {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
index f28a12824e..5361d4eafa 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
@@ -27,7 +27,7 @@ import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl, 
CommitMessageSerializer}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{PaimonSparkSession, SparkSession}
 import org.apache.spark.sql.catalyst.SQLConfHelper
 
 import java.util
@@ -110,7 +110,7 @@ object SparkRemoveUnexistingFiles extends SQLConfHelper {
       tableName: String,
       dryRun: Boolean,
       parallelismOpt: Integer): Array[String] = {
-    val spark = SparkSession.active
+    val spark = PaimonSparkSession.active
     val parallelism = if (parallelismOpt == null) {
       Math.max(spark.sparkContext.defaultParallelism, 
conf.numShufflePartitions)
     } else {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
index 8f24700c29..5553b6d8ef 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
 import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
 import org.apache.spark.sql.types._
 
 /**
@@ -34,7 +35,7 @@ object PaimonStatsUtils {
       sparkSession: SparkSession,
       relation: LogicalPlan,
       columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
-    CommandUtils.computeColumnStats(sparkSession, relation, columns)
+    SparkShimLoader.shim.classicApi.computeColumnStats(sparkSession, relation, 
columns)
   }
 
   /** [[IntegralType]] is private in spark, therefore we need add it here. */
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 9023bfa646..885910dbc2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
 import org.apache.spark.sql.internal.connector.PredicateUtils
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.util.PartitioningUtils
@@ -54,11 +55,11 @@ object PaimonUtils {
    * [[org.apache.spark.sql.execution.streaming.Sink.addBatch]].
    */
   def createNewDataFrame(data: DataFrame): DataFrame = {
-    data.sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, 
data.schema)
+    SparkShimLoader.shim.classicApi.createDataset(data)
   }
 
   def createDataset(sparkSession: SparkSession, logicalPlan: LogicalPlan): 
Dataset[Row] = {
-    Dataset.ofRows(sparkSession, logicalPlan)
+    SparkShimLoader.shim.classicApi.createDataset(sparkSession, logicalPlan)
   }
 
   def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): 
Seq[Expression] = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 557b0735c7..73e1ea3ec8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -25,7 +25,7 @@ import org.antlr.v4.runtime.atn.PredictionMode
 import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
 import org.antlr.v4.runtime.tree.TerminalNodeImpl
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, PaimonSparkSession, 
SparkSession}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
@@ -65,7 +65,7 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
       parse(sqlTextAfterSubstitution)(parser => 
astBuilder.visit(parser.singleStatement()))
         .asInstanceOf[LogicalPlan]
     } else {
-      
RewritePaimonViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText))
+      
RewritePaimonViewCommands(PaimonSparkSession.active).apply(delegate.parsePlan(sqlText))
     }
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
index 5db6894ba0..f330fed3f3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.connector.catalog
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{PaimonSparkSession, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
 import org.apache.spark.sql.connector.catalog.CatalogV2Util
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
@@ -30,7 +30,7 @@ object PaimonCatalogUtils {
 
   def buildExternalCatalog(conf: SparkConf, hadoopConf: Configuration): 
ExternalCatalog = {
     val externalCatalogClassName =
-      if 
(SparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) {
+      if 
(PaimonSparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) 
{
         "org.apache.spark.sql.hive.HiveExternalCatalog"
       } else {
         "org.apache.spark.sql.catalyst.catalog.InMemoryCatalog"
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index 8279a6de31..fd6627c095 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -22,17 +22,17 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.spark.SparkCatalog
 import org.apache.paimon.spark.catalog.FormatTableCatalog
 
-import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LogicalPlan, TableSpec}
 import org.apache.spark.sql.connector.catalog.StagingTableCatalog
-import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
+import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, 
SparkStrategy}
 import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
 
 import scala.collection.JavaConverters._
 
 case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
-  extends Strategy
+  extends SparkStrategy
   with PaimonStrategyHelper {
 
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonSparkSession.scala
similarity index 68%
copy from 
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonSparkSession.scala
index 647b4cfdca..674a9196f7 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonSparkSession.scala
@@ -16,18 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.sql
+package org.apache.spark.sql
 
-import org.apache.spark.SPARK_VERSION
+object PaimonSparkSession {
 
-trait SparkVersionSupport {
-  lazy val sparkVersion: String = SPARK_VERSION
+  def active: SparkSession = SparkSession.active
 
-  lazy val gteqSpark3_3: Boolean = sparkVersion >= "3.3"
-
-  lazy val gteqSpark3_4: Boolean = sparkVersion >= "3.4"
-
-  lazy val gteqSpark3_5: Boolean = sparkVersion >= "3.5"
-
-  lazy val gteqSpark4_0: Boolean = sparkVersion >= "4.0"
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
similarity index 50%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
index 920896547a..21381cca29 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
@@ -18,26 +18,28 @@
 
 package org.apache.spark.sql.paimon.shims
 
-import java.util.ServiceLoader
+import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.execution.SparkPlan
 
-import scala.collection.JavaConverters._
+trait ClassicApi {
 
-/** Load a [[SparkShim]]'s implementation. */
-object SparkShimLoader {
+  def column(expression: Expression): Column
 
-  private lazy val sparkShim: SparkShim = loadSparkShim()
+  def expression(spark: SparkSession, column: Column): Expression
 
-  def getSparkShim: SparkShim = {
-    sparkShim
-  }
+  def createDataset(data: DataFrame): DataFrame
+
+  def createDataset(sparkSession: SparkSession, logicalPlan: LogicalPlan): 
Dataset[Row]
+
+  def prepareExecutedPlan(spark: SparkSession, logicalPlan: LogicalPlan): 
SparkPlan
+
+  def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit
+
+  def computeColumnStats(
+      sparkSession: SparkSession,
+      relation: LogicalPlan,
+      columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat])
 
-  private def loadSparkShim(): SparkShim = {
-    val shims = ServiceLoader.load(classOf[SparkShim]).asScala
-    if (shims.isEmpty) {
-      throw new IllegalStateException("No available spark shim here.")
-    } else if (shims.size > 1) {
-      throw new IllegalStateException("Found more than one spark shim here.")
-    }
-    shims.head
-  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index 6b771a3339..ef764bc0d1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -22,9 +22,10 @@ import org.apache.paimon.data.variant.Variant
 import org.apache.paimon.spark.data.{SparkArrayData, SparkInternalRow}
 import org.apache.paimon.types.{DataType, RowType}
 
-import org.apache.spark.sql.{Column, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -42,6 +43,8 @@ import java.util.{Map => JMap}
  */
 trait SparkShim {
 
+  def classicApi: ClassicApi
+
   def createSparkParser(delegate: ParserInterface): ParserInterface
 
   def createCustomResolution(spark: SparkSession): Rule[LogicalPlan]
@@ -50,10 +53,6 @@ trait SparkShim {
 
   def createSparkArrayData(elementType: DataType): SparkArrayData
 
-  def supportsHashAggregate(
-      aggregateBufferAttributes: Seq[Attribute],
-      groupingExpression: Seq[Expression]): Boolean
-
   def createTable(
       tableCatalog: TableCatalog,
       ident: Identifier,
@@ -67,9 +66,13 @@ trait SparkShim {
       output: Seq[Attribute],
       isStreaming: Boolean): CTERelationRef
 
-  def column(expr: Expression): Column
+  def supportsHashAggregate(
+      aggregateBufferAttributes: Seq[Attribute],
+      groupingExpression: Seq[Expression]): Boolean
 
-  def convertToExpression(spark: SparkSession, column: Column): Expression
+  def supportsObjectHashAggregate(
+      aggregateExpressions: Seq[AggregateExpression],
+      groupByExpressions: Seq[Expression]): Boolean
 
   // for variant
   def toPaimonVariant(o: Object): Variant
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala
index 920896547a..d6b5850b9d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala
@@ -27,7 +27,7 @@ object SparkShimLoader {
 
   private lazy val sparkShim: SparkShim = loadSparkShim()
 
-  def getSparkShim: SparkShim = {
+  def shim: SparkShim = {
     sparkShim
   }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 488913f14a..9f383a54c2 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -56,8 +57,9 @@ public class SparkCatalogWithHiveTest {
     }
 
     @Test
-    public void testCreateFormatTable() {
-        try (SparkSession spark = createSessionBuilder().getOrCreate()) {
+    public void testCreateFormatTable() throws IOException {
+        SparkSession spark = createSessionBuilder().getOrCreate();
+        {
             spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
             spark.sql("USE spark_catalog.my_db1");
 
@@ -110,7 +112,7 @@ public class SparkCatalogWithHiveTest {
     }
 
     @Test
-    public void testSpecifyHiveConfDirInGenericCatalog() {
+    public void testSpecifyHiveConfDirInGenericCatalog() throws IOException {
         try (SparkSession spark =
                 createSessionBuilder()
                         
.config("spark.sql.catalog.spark_catalog.hive-conf-dir", "nonExistentPath")
@@ -126,7 +128,7 @@ public class SparkCatalogWithHiveTest {
     }
 
     @Test
-    public void testCreateExternalTable() {
+    public void testCreateExternalTable() throws IOException {
         try (SparkSession spark = createSessionBuilder().getOrCreate()) {
             String warehousePath = 
spark.sparkContext().conf().get("spark.sql.warehouse.dir");
             spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
index 0ae0e91306..b38995da14 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -82,7 +83,7 @@ public class SparkGenericCatalogTest {
     }
 
     @Test
-    public void testSparkSessionReload() {
+    public void testSparkSessionReload() throws IOException {
         spark.sql("CREATE DATABASE my_db");
         spark.close();
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
index 604e2ea279..3bb013648e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -48,7 +49,7 @@ public class SparkGenericCatalogWithHiveTest {
     }
 
     @Test
-    public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) {
+    public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) throws 
IOException {
         // firstly, we use hive metastore to create table, and check the 
result.
         Path warehousePath = new Path("file:" + tempDir.toString());
         SparkSession spark =
@@ -105,7 +106,7 @@ public class SparkGenericCatalogWithHiveTest {
     }
 
     @Test
-    public void testHiveCatalogOptions(@TempDir java.nio.file.Path tempDir) {
+    public void testHiveCatalogOptions(@TempDir java.nio.file.Path tempDir) 
throws IOException {
         Path warehousePath = new Path("file:" + tempDir.toString());
         SparkSession spark =
                 SparkSession.builder()
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java
index e4e571e96b..9f77a93bf0 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java
@@ -22,6 +22,7 @@ import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonCallArgument;
 import org.apache.paimon.spark.catalyst.plans.logical.PaimonCallStatement;
 import org.apache.paimon.spark.catalyst.plans.logical.PaimonNamedArgument;
 import org.apache.paimon.spark.catalyst.plans.logical.PaimonPositionalArgument;
+import org.apache.paimon.spark.sql.SparkVersionSupport$;
 
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.expressions.Literal$;
@@ -81,16 +82,20 @@ public class CallStatementParserTest {
 
     @Test
     public void testDelegateUnsupportedProcedure() {
-        assertThatThrownBy(() -> parser.parsePlan("CALL cat.d.t()"))
-                .isInstanceOf(ParseException.class)
-                .satisfies(
-                        exception -> {
-                            ParseException parseException = (ParseException) 
exception;
-                            assertThat(parseException.getErrorClass())
-                                    .isEqualTo("PARSE_SYNTAX_ERROR");
-                            
assertThat(parseException.getMessageParameters().get("error"))
-                                    .isEqualTo("'CALL'");
-                        });
+        if (!SparkVersionSupport$.MODULE$.gteqSpark4_0()) {
+            // TODO: adapt spark 4.0 to make Paimon parser only apply own 
supported procedures.
+
+            assertThatThrownBy(() -> parser.parsePlan("CALL cat.d.t()"))
+                    .isInstanceOf(ParseException.class)
+                    .satisfies(
+                            exception -> {
+                                ParseException parseException = 
(ParseException) exception;
+                                assertThat(parseException.getErrorClass())
+                                        .isEqualTo("PARSE_SYNTAX_ERROR");
+                                
assertThat(parseException.getMessageParameters().get("error"))
+                                        .isEqualTo("'CALL'");
+                            });
+        }
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala
index a5f9f3ffa0..df98026022 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.procedure
 
 import org.apache.paimon.spark.PaimonSparkTestBase
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.parser.extensions.PaimonParseException
 import org.assertj.core.api.Assertions.assertThatThrownBy
@@ -31,8 +32,13 @@ abstract class ProcedureTestBase extends PaimonSparkTestBase 
{
                  |CREATE TABLE T (id INT, name STRING, dt STRING)
                  |""".stripMargin)
 
-    assertThatThrownBy(() => spark.sql("CALL sys.unknown_procedure(table => 
'test.T')"))
-      .isInstanceOf(classOf[ParseException])
+    if (gteqSpark4_0) {
+      assertThatThrownBy(() => spark.sql("CALL sys.unknown_procedure(table => 
'test.T')"))
+        .isInstanceOf(classOf[AnalysisException])
+    } else {
+      assertThatThrownBy(() => spark.sql("CALL sys.unknown_procedure(table => 
'test.T')"))
+        .isInstanceOf(classOf[ParseException])
+    }
   }
 
   test(s"test parse exception") {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index 36315f0228..06c2eaf049 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -285,41 +285,45 @@ abstract class DDLWithHiveCatalogTestBase extends 
PaimonHiveTestBase {
   }
 
   test("Paimon DDL with hive catalog: set default database") {
-    var reusedSpark = spark
+    if (!gteqSpark4_0) {
+      // TODO: This is skipped in Spark 4.0, because it would fail in afterAll 
method, not because the default database is not supported.
 
-    Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
-      catalogName =>
-        {
-          val dbName = s"${catalogName}_default_db"
-          val tblName = s"${dbName}_tbl"
-
-          reusedSpark.sql(s"use $catalogName")
-          reusedSpark.sql(s"create database $dbName")
-          reusedSpark.sql(s"use $dbName")
-          reusedSpark.sql(s"create table $tblName (id int, name string, dt 
string) using paimon")
-          reusedSpark.stop()
-
-          reusedSpark = SparkSession
-            .builder()
-            .master("local[2]")
-            .config(sparkConf)
-            .config("spark.sql.defaultCatalog", catalogName)
-            .config(s"spark.sql.catalog.$catalogName.defaultDatabase", dbName)
-            .getOrCreate()
-
-          if (catalogName.equals(sparkCatalogName) && !gteqSpark3_4) {
-            checkAnswer(reusedSpark.sql("show tables").select("tableName"), 
Nil)
+      var reusedSpark = spark
+
+      Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
+        catalogName =>
+          {
+            val dbName = s"${catalogName}_default_db"
+            val tblName = s"${dbName}_tbl"
+
+            reusedSpark.sql(s"use $catalogName")
+            reusedSpark.sql(s"create database $dbName")
             reusedSpark.sql(s"use $dbName")
+            reusedSpark.sql(s"create table $tblName (id int, name string, dt 
string) using paimon")
+            reusedSpark.stop()
+
+            reusedSpark = SparkSession
+              .builder()
+              .master("local[2]")
+              .config(sparkConf)
+              .config("spark.sql.defaultCatalog", catalogName)
+              .config(s"spark.sql.catalog.$catalogName.defaultDatabase", 
dbName)
+              .getOrCreate()
+
+            if (catalogName.equals(sparkCatalogName) && !gteqSpark3_4) {
+              checkAnswer(reusedSpark.sql("show tables").select("tableName"), 
Nil)
+              reusedSpark.sql(s"use $dbName")
+            }
+            checkAnswer(reusedSpark.sql("show tables").select("tableName"), 
Row(tblName) :: Nil)
+
+            reusedSpark.sql(s"drop table $tblName")
           }
-          checkAnswer(reusedSpark.sql("show tables").select("tableName"), 
Row(tblName) :: Nil)
+      }
 
-          reusedSpark.sql(s"drop table $tblName")
-        }
+      // Since we created a new sparkContext, we need to stop it and reset the 
default sparkContext
+      reusedSpark.stop()
+      reset()
     }
-
-    // Since we created a new sparkContext, we need to stop it and reset the 
default sparkContext
-    reusedSpark.stop()
-    reset()
   }
 
   test("Paimon DDL with hive catalog: drop database cascade which contains 
paimon table") {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 87f4c94486..e5f1d0e131 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, LogicalPlan,
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.paimon.Utils
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
 import org.junit.jupiter.api.Assertions
 
 import scala.collection.immutable
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala
index 647b4cfdca..9dadb26e25 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala
@@ -21,7 +21,8 @@ package org.apache.paimon.spark.sql
 import org.apache.spark.SPARK_VERSION
 
 trait SparkVersionSupport {
-  lazy val sparkVersion: String = SPARK_VERSION
+
+  val sparkVersion: String = SPARK_VERSION
 
   lazy val gteqSpark3_3: Boolean = sparkVersion >= "3.3"
 
@@ -31,3 +32,5 @@ trait SparkVersionSupport {
 
   lazy val gteqSpark4_0: Boolean = sparkVersion >= "4.0"
 }
+
+object SparkVersionSupport extends SparkVersionSupport {}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
index 03f1c7706e..61a479b9f0 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.paimon
 
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
 import org.apache.spark.util.{Utils => SparkUtils}
 
 import java.io.File
@@ -36,7 +37,7 @@ object Utils {
   }
 
   def createDataFrame(sparkSession: SparkSession, plan: LogicalPlan): 
DataFrame = {
-    Dataset.ofRows(sparkSession, plan)
+    SparkShimLoader.shim.classicApi.createDataset(sparkSession, plan)
   }
 
 }
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
new file mode 100644
index 0000000000..b0782c59d6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.paimon.shims
+
+import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
+import org.apache.spark.sql.execution.command.CommandUtils
+class Classic3Api extends ClassicApi {
+
+  override def column(expression: Expression): Column = new Column(expression)
+
+  override def expression(spark: SparkSession, column: Column): Expression = 
column.expr
+
+  override def createDataset(data: DataFrame): DataFrame = {
+    data.sqlContext
+      .internalCreateDataFrame(data.queryExecution.toRdd, data.schema)
+  }
+
+  override def createDataset(spark: SparkSession, logicalPlan: LogicalPlan): 
Dataset[Row] = {
+    Dataset.ofRows(spark, logicalPlan)
+  }
+
+  override def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
+    spark.sharedState.cacheManager.recacheByPlan(spark, plan)
+  }
+
+  override def prepareExecutedPlan(spark: SparkSession, logicalPlan: 
LogicalPlan): SparkPlan = {
+    QueryExecution.prepareExecutedPlan(spark, logicalPlan)
+  }
+
+  override def computeColumnStats(
+      spark: SparkSession,
+      relation: LogicalPlan,
+      columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) =
+    CommandUtils.computeColumnStats(spark, relation, columns)
+
+}
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index b173a3ff5c..18ba22674d 100644
--- 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -27,18 +27,22 @@ import org.apache.paimon.types.{DataType, RowType}
 import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.types.StructType
 
 import java.util.{Map => JMap}
 
 class Spark3Shim extends SparkShim {
 
+  override def classicApi: ClassicApi = new Classic3Api
+
   override def createSparkParser(delegate: ParserInterface): ParserInterface = 
{
     new PaimonSpark3SqlExtensionsParser(delegate)
   }
@@ -55,12 +59,6 @@ class Spark3Shim extends SparkShim {
     new Spark3ArrayData(elementType)
   }
 
-  override def supportsHashAggregate(
-      aggregateBufferAttributes: Seq[Attribute],
-      groupingExpression: Seq[Expression]): Boolean = {
-    Aggregate.supportsHashAggregate(aggregateBufferAttributes)
-  }
-
   override def createTable(
       tableCatalog: TableCatalog,
       ident: Identifier,
@@ -77,9 +75,15 @@ class Spark3Shim extends SparkShim {
       isStreaming: Boolean): CTERelationRef =
     MinorVersionShim.createCTERelationRef(cteId, resolved, output, isStreaming)
 
-  override def column(expr: Expression): Column = new Column(expr)
+  override def supportsHashAggregate(
+      aggregateBufferAttributes: Seq[Attribute],
+      groupingExpression: Seq[Expression]): Boolean =
+    Aggregate.supportsHashAggregate(aggregateBufferAttributes)
 
-  override def convertToExpression(spark: SparkSession, column: Column): 
Expression = column.expr
+  override def supportsObjectHashAggregate(
+      aggregateExpressions: Seq[AggregateExpression],
+      groupByExpressions: Seq[Expression]): Boolean =
+    Aggregate.supportsObjectHashAggregate(aggregateExpressions)
 
   override def toPaimonVariant(o: Object): Variant = throw new 
UnsupportedOperationException()
 
@@ -94,4 +98,5 @@ class Spark3Shim extends SparkShim {
 
   override def toPaimonVariant(array: ArrayData, pos: Int): Variant =
     throw new UnsupportedOperationException()
+
 }
diff --git a/paimon-spark/paimon-spark4-common/pom.xml 
b/paimon-spark/paimon-spark4-common/pom.xml
index e839cd45df..e8b7356eca 100644
--- a/paimon-spark/paimon-spark4-common/pom.xml
+++ b/paimon-spark/paimon-spark4-common/pom.xml
@@ -49,6 +49,10 @@ under the License.
             <artifactId>spark-sql-api_2.13</artifactId>
             <version>${spark.version}</version>
             <exclusions>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    
<artifactId>spark-connect-shims_${scala.binary.version}</artifactId>
+                </exclusion>
                 <exclusion>
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/parser/extensions/PaimonSpark4SqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/parser/extensions/PaimonSpark4SqlExtensionsParser.scala
index ef1f5763d2..9bd395f333 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/parser/extensions/PaimonSpark4SqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/parser/extensions/PaimonSpark4SqlExtensionsParser.scala
@@ -18,11 +18,12 @@
 
 package org.apache.paimon.spark.catalyst.parser.extensions
 
-import org.apache.spark.sql.catalyst.parser.{CompoundBody, ParserInterface}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
 import 
org.apache.spark.sql.catalyst.parser.extensions.AbstractPaimonSparkSqlExtensionsParser
+import org.apache.spark.sql.types.StructType
 
 class PaimonSpark4SqlExtensionsParser(override val delegate: ParserInterface)
   extends AbstractPaimonSparkSqlExtensionsParser(delegate) {
 
-  def parseScript(sqlScriptText: String): CompoundBody = 
delegate.parseScript(sqlScriptText)
+  override def parseRoutineParam(sqlText: String): StructType = 
delegate.parseRoutineParam(sqlText)
 }
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
new file mode 100644
index 0000000000..8dff78f0bc
--- /dev/null
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.paimon.shims
+
+import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.classic.{ClassicConversions, Dataset => 
ClassicDataset, ExpressionUtils}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
+import org.apache.spark.sql.execution.command.CommandUtils
+
+/**
+ * This class is used to implement the conversion from sql-api to classic one. 
Make sure this is the
+ * only class that implements 
[[org.apache.spark.sql.classic.ClassicConversions]] in Paimon-Spark.
+ */
+class Classic4Api extends ClassicApi with ClassicConversions {
+
+  override def column(expression: Expression): Column = 
ExpressionUtils.column(expression)
+
+  override def expression(spark: SparkSession, column: Column): Expression = {
+    spark.expression(column)
+  }
+
+  override def createDataset(data: DataFrame): DataFrame = {
+    data.sqlContext
+      .internalCreateDataFrame(data.queryExecution.toRdd, data.schema)
+  }
+
+  override def createDataset(spark: SparkSession, logicalPlan: LogicalPlan): 
Dataset[Row] = {
+    ClassicDataset.ofRows(spark, logicalPlan)
+  }
+
+  override def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
+    spark.sharedState.cacheManager.recacheByPlan(spark, plan)
+  }
+
+  override def prepareExecutedPlan(spark: SparkSession, logicalPlan: 
LogicalPlan): SparkPlan = {
+    QueryExecution.prepareExecutedPlan(spark, logicalPlan)
+  }
+
+  override def computeColumnStats(
+      spark: SparkSession,
+      relation: LogicalPlan,
+      columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
+    CommandUtils.computeColumnStats(spark, relation, columns.toSeq)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index e8e4783138..cb535ef86c 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -24,16 +24,16 @@ import 
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensi
 import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, 
SparkArrayData, SparkInternalRow}
 import org.apache.paimon.types.{DataType, RowType}
 
-import org.apache.spark.sql.{Column, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.internal.ExpressionUtils
 import org.apache.spark.sql.types.{DataTypes, StructType, VariantType}
 import org.apache.spark.unsafe.types.VariantVal
 
@@ -41,6 +41,8 @@ import java.util.{Map => JMap}
 
 class Spark4Shim extends SparkShim {
 
+  override def classicApi: ClassicApi = new Classic4Api
+
   override def createSparkParser(delegate: ParserInterface): ParserInterface = 
{
     new PaimonSpark4SqlExtensionsParser(delegate)
   }
@@ -57,13 +59,7 @@ class Spark4Shim extends SparkShim {
     new Spark4ArrayData(elementType)
   }
 
-  def supportsHashAggregate(
-      aggregateBufferAttributes: Seq[Attribute],
-      groupingExpression: Seq[Expression]): Boolean = {
-    Aggregate.supportsHashAggregate(aggregateBufferAttributes, 
groupingExpression)
-  }
-
-  def createTable(
+  override def createTable(
       tableCatalog: TableCatalog,
       ident: Identifier,
       schema: StructType,
@@ -81,10 +77,16 @@ class Spark4Shim extends SparkShim {
     CTERelationRef(cteId, resolved, output.toSeq, isStreaming)
   }
 
-  def column(expr: Expression): Column = ExpressionUtils.column(expr)
+  override def supportsHashAggregate(
+      aggregateBufferAttributes: Seq[Attribute],
+      groupingExpression: Seq[Expression]): Boolean = {
+    Aggregate.supportsHashAggregate(aggregateBufferAttributes.toSeq, 
groupingExpression.toSeq)
+  }
 
-  def convertToExpression(spark: SparkSession, column: Column): Expression =
-    spark.expression(column)
+  override def supportsObjectHashAggregate(
+      aggregateExpressions: Seq[AggregateExpression],
+      groupByExpressions: Seq[Expression]): Boolean =
+    Aggregate.supportsObjectHashAggregate(aggregateExpressions.toSeq, 
groupByExpressions.toSeq)
 
   override def toPaimonVariant(o: Object): Variant = {
     val v = o.asInstanceOf[VariantVal]
diff --git a/pom.xml b/pom.xml
index bfdd189eff..ef045dbcb9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -400,10 +400,10 @@ under the License.
                 <antlr4.version>4.13.1</antlr4.version>
                 <scala.binary.version>2.13</scala.binary.version>
                 <scala.version>${scala213.version}</scala.version>
-                
<paimon-spark-common.spark.version>4.0.0-preview2</paimon-spark-common.spark.version>
+                
<paimon-spark-common.spark.version>4.0.0</paimon-spark-common.spark.version>
                 
<paimon-sparkx-common>paimon-spark4-common</paimon-sparkx-common>
                 <test.spark.main.version>4.0</test.spark.main.version>
-                <test.spark.version>4.0.0-preview2</test.spark.version>
+                <test.spark.version>4.0.0</test.spark.version>
             </properties>
             <activation>
                 <property>

Reply via email to