This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c594cf71 [spark] Support Spark 3.5 (#2019)
9c594cf71 is described below

commit 9c594cf713ad06df5d33c012908a946e4aa6d482
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed Oct 25 13:38:42 2023 +0800

    [spark] Support Spark 3.5 (#2019)
---
 docs/content/engines/overview.md                   |   2 +-
 docs/content/engines/spark3.md                     |   4 +-
 docs/content/project/download.md                   |   2 +
 paimon-spark/paimon-spark-3.2/pom.xml              |   2 +-
 paimon-spark/paimon-spark-3.3/pom.xml              |   2 +-
 paimon-spark/paimon-spark-3.4/pom.xml              | 188 +++++++++++++-
 .../src/test/resources/log4j2-test.properties      |  38 +++
 .../org/apache/paimon/spark/PaimonSinkTest.scala   | 281 +++++++++++++++++++++
 .../apache/paimon/spark/PaimonSparkTestBase.scala  |  92 +++++++
 .../sql/CreateAndDeleteTagProcedureTest.scala      |  86 +++++++
 .../paimon/spark/sql/RollbackProcedureTest.scala   |  94 +++++++
 .../scala/org/apache/spark/paimon/Utils.scala}     |  18 +-
 .../{paimon-spark-3.3 => paimon-spark-3.5}/pom.xml |   6 +-
 paimon-spark/paimon-spark-common/pom.xml           |  26 +-
 .../spark/commands/WriteIntoPaimonTable.scala      |   8 +-
 .../apache/paimon/spark/util/EncoderUtils.scala    |  66 +++++
 .../sql/catalyst/plans/logical/CallCommand.scala   |   6 +-
 .../org/apache/paimon/spark/SparkReadTestBase.java |   2 +-
 paimon-spark/pom.xml                               |   1 +
 pom.xml                                            |  20 ++
 20 files changed, 919 insertions(+), 25 deletions(-)

diff --git a/docs/content/engines/overview.md b/docs/content/engines/overview.md
index 3d2cd9e77..30d48c3fc 100644
--- a/docs/content/engines/overview.md
+++ b/docs/content/engines/overview.md
@@ -35,7 +35,7 @@ Apache Spark and Apache Hive.
 | Engine |    Version    | Batch Read | Batch Write | Create Table | Alter 
Table | Streaming Write | Streaming Read | Batch Overwrite |
 
|:------:|:-------------:|:----------:|:-----------:|:------------:|:-----------:|:---------------:|:--------------:|:---------------:|
 | Flink  |  1.14 - 1.17  |     ✅      |      ✅      |      ✅       |  ✅(1.17+) 
  |        ✅        |       ✅        |        ✅        |
-| Spark  |   3.1 - 3.4   |     ✅      |      ✅      |      ✅       |      ✅    
  |        ✅         |    ✅(3.3+)     |        ❌        |
+| Spark  |   3.1 - 3.5   |     ✅      |      ✅      |      ✅       |      ✅    
  |        ✅         |    ✅(3.3+)     |        ❌        |
 |  Hive  |   2.1 - 3.1   |     ✅      |      ✅      |      ✅       |      ❌    
  |        ❌        |       ❌        |        ❌        |
 | Spark  |      2.4      |     ✅      |      ❌      |      ❌       |      ❌    
  |        ❌        |       ❌        |        ❌        |
 | Trino  |   358 - 422   |     ✅      |      ❌      |      ✅       |      ✅    
  |        ❌        |       ❌        |        ❌        |
diff --git a/docs/content/engines/spark3.md b/docs/content/engines/spark3.md
index 38320e36c..414679bb7 100644
--- a/docs/content/engines/spark3.md
+++ b/docs/content/engines/spark3.md
@@ -30,7 +30,7 @@ This documentation is a guide for using Paimon in Spark3.
 
 ## Preparation
 
-Paimon currently supports Spark 3.4, 3.3, 3.2 and 3.1. We recommend the latest 
Spark version for a better experience.
+Paimon currently supports Spark 3.5, 3.4, 3.3, 3.2 and 3.1. We recommend the 
latest Spark version for a better experience.
 
 Download the jar file with corresponding version.
 
@@ -38,6 +38,7 @@ Download the jar file with corresponding version.
 
 | Version   | Jar                                                              
                                                                                
                    |
 
|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Spark 3.5 | [paimon-spark-3.5-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.5/{{<
 version >}}/paimon-spark-3.5-{{< version >}}.jar) |
 | Spark 3.4 | [paimon-spark-3.4-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.4/{{<
 version >}}/paimon-spark-3.4-{{< version >}}.jar) |
 | Spark 3.3 | [paimon-spark-3.3-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.3/{{<
 version >}}/paimon-spark-3.3-{{< version >}}.jar) |
 | Spark 3.2 | [paimon-spark-3.2-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.2/{{<
 version >}}/paimon-spark-3.2-{{< version >}}.jar) |
@@ -49,6 +50,7 @@ Download the jar file with corresponding version.
 
 | Version   | Jar                                                              
                                                                   |
 
|-----------|-------------------------------------------------------------------------------------------------------------------------------------|
+| Spark 3.5 | [paimon-spark-3.5-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.5/{{<
 version >}}/) |
 | Spark 3.4 | [paimon-spark-3.4-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.4/{{<
 version >}}/) |
 | Spark 3.3 | [paimon-spark-3.3-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.3/{{<
 version >}}/) |
 | Spark 3.2 | [paimon-spark-3.2-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.2/{{<
 version >}}/) |
diff --git a/docs/content/project/download.md b/docs/content/project/download.md
index 5fc9145eb..0ca56cc91 100644
--- a/docs/content/project/download.md
+++ b/docs/content/project/download.md
@@ -39,6 +39,7 @@ This documentation is a guide for downloading Paimon Jars.
 | Flink 1.15       | [paimon-flink-1.15-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.15/{{<
 version >}}/)                                 |
 | Flink 1.14       | [paimon-flink-1.14-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.14/{{<
 version >}}/)                                 |
 | Flink Action     | [paimon-flink-action-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-action/{{<
 version >}}/)                             |
+| Spark 3.5        | [paimon-spark-3.5-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.5/{{<
 version >}}/)                                   |
 | Spark 3.4        | [paimon-spark-3.4-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.4/{{<
 version >}}/)                                   |
 | Spark 3.3        | [paimon-spark-3.3-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.3/{{<
 version >}}/)                                   |
 | Spark 3.2        | [paimon-spark-3.2-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.2/{{<
 version >}}/)                                   |
@@ -72,6 +73,7 @@ This documentation is a guide for downloading Paimon Jars.
 | Flink 1.15       | [paimon-flink-1.15-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.15/{{<
 version >}}/paimon-flink-1.15-{{< version >}}.jar)                             
                    |
 | Flink 1.14       | [paimon-flink-1.14-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.14/{{<
 version >}}/paimon-flink-1.14-{{< version >}}.jar)                             
                    |
 | Flink Action     | [paimon-flink-action-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-action/{{<
 version >}}/paimon-flink-action-{{< version >}}.jar)                           
                |
+| Spark 3.5        | [paimon-spark-3.5-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.5/{{<
 version >}}/paimon-spark-3.5-{{< version >}}.jar)                              
                      |
 | Spark 3.4        | [paimon-spark-3.4-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.4/{{<
 version >}}/paimon-spark-3.4-{{< version >}}.jar)                              
                      |
 | Spark 3.3        | [paimon-spark-3.3-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.3/{{<
 version >}}/paimon-spark-3.3-{{< version >}}.jar)                              
                      |
 | Spark 3.2        | [paimon-spark-3.2-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.2/{{<
 version >}}/paimon-spark-3.2-{{< version >}}.jar)                              
                      |
diff --git a/paimon-spark/paimon-spark-3.2/pom.xml 
b/paimon-spark/paimon-spark-3.2/pom.xml
index 981cdf17d..df6281927 100644
--- a/paimon-spark/paimon-spark-3.2/pom.xml
+++ b/paimon-spark/paimon-spark-3.2/pom.xml
@@ -32,7 +32,7 @@ under the License.
     <name>Paimon : Spark : 3.2</name>
 
     <properties>
-        <spark.version>3.2.2</spark.version>
+        <spark.version>3.2.4</spark.version>
     </properties>
 
     <dependencies>
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml 
b/paimon-spark/paimon-spark-3.3/pom.xml
index 04766914f..9cada28fe 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.3/pom.xml
@@ -32,7 +32,7 @@ under the License.
     <name>Paimon : Spark : 3.3</name>
 
     <properties>
-        <spark.version>3.3.2</spark.version>
+        <spark.version>3.3.3</spark.version>
     </properties>
 
     <dependencies>
diff --git a/paimon-spark/paimon-spark-3.4/pom.xml 
b/paimon-spark/paimon-spark-3.4/pom.xml
index b25a56273..c2932c418 100644
--- a/paimon-spark/paimon-spark-3.4/pom.xml
+++ b/paimon-spark/paimon-spark-3.4/pom.xml
@@ -32,7 +32,7 @@ under the License.
     <name>Paimon : Spark : 3.4</name>
 
     <properties>
-        <spark.version>3.4.0</spark.version>
+        <spark.version>3.4.1</spark.version>
     </properties>
 
     <dependencies>
@@ -55,11 +55,128 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/ 
SLF4J1 -->
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_2.12</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <version>3.1.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>net.alchim31.maven</groupId>
+                    <artifactId>scala-maven-plugin</artifactId>
+                    <version>${scala-maven-plugin.version}</version>
+                    <configuration>
+                        <args>
+                            <arg>-nobootcp</arg>
+                            <arg>-target:jvm-${target.java.version}</arg>
+                        </args>
+                        
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
+                    </configuration>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                </plugin>
+            </plugins>
+        </pluginManagement>
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
@@ -81,6 +198,75 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>${scala-maven-plugin.version}</version>
+                <executions>
+                    <!-- Run scala compiler in the process-resources phase, so 
that dependencies on
+                        scala classes can be resolved later in the (Java) 
compile phase -->
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+
+                    <!-- Run scala compiler in the process-test-resources 
phase, so that dependencies on
+                         scala classes can be resolved later in the (Java) 
test-compile phase -->
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>${spotless.version}</version>
+                <configuration>
+                    <scala>
+                        <scalafmt>
+                            <version>3.4.3</version>
+                            <!-- This file is in the root of the project to 
make sure IntelliJ picks it up automatically -->
+                            
<file>${project.basedir}/../../.scalafmt.conf</file>
+                        </scalafmt>
+                        <licenseHeader>
+                            <content>${spotless.license.header}</content>
+                            <delimiter>${spotless.delimiter}</delimiter>
+                        </licenseHeader>
+                    </scala>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <version>${scalatest-maven-plugin.version}</version>
+                <configuration>
+                    
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <argLine>-ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g 
-XX:ReservedCodeCacheSize=${CodeCacheSize} ${extraJavaTestArgs} 
-Dio.netty.tryReflectionSetAccessible=true</argLine>
+                    <filereports>PaimonTestSuite.txt</filereports>
+                    <!--
+                      the plugin does not support always now, we must be 
careful w/ singletons.
+                      tracking at: 
https://github.com/scalatest/scalatest-maven-plugin/issues/99
+                      -->
+                    <forkMode>once</forkMode>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/resources/log4j2-test.properties 
b/paimon-spark/paimon-spark-3.4/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..6f324f586
--- /dev/null
+++ b/paimon-spark/paimon-spark-3.4/src/test/resources/log4j2-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
+
+logger.kafka.name = kafka
+logger.kafka.level = OFF
+logger.kafka2.name = state.change
+logger.kafka2.level = OFF
+
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = OFF
+logger.I0Itec.name = org.I0Itec
+logger.I0Itec.level = OFF
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
new file mode 100644
index 000000000..36df428a5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.{col, mean, window}
+import org.apache.spark.sql.streaming.StreamTest
+
+import java.sql.Date
+
+class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
+
+  import testImplicits._
+
+  test("Paimon Sink: forEachBatch") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and test `forEachBatch` api
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], id: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            inputData.addData((2, "b"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            inputData.addData((2, "b2"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
+  test("Paimon Sink: append mode") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and sink into it in append mode
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .format("paimon")
+            .start(location)
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            inputData.addData((2, "b"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            inputData.addData((2, "b2"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
+  test("Paimon Sink: complete mode") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define an append-only table and sink into it in complete mode
+          spark.sql(s"""
+                       |CREATE TABLE T (city String, population Long)
+                       |TBLPROPERTIES ('write-mode'='append-only', 
'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData.toDS
+            .toDF("uid", "city")
+            .groupBy("city")
+            .count()
+            .toDF("city", "population")
+            .writeStream
+            .outputMode("complete")
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .format("paimon")
+            .start(location)
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY city")
+
+          try {
+            inputData.addData((1, "HZ"), (2, "BJ"), (3, "BJ"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row("BJ", 2L) :: Row("HZ", 1L) :: Nil)
+
+            inputData.addData((4, "SH"), (5, "BJ"), (6, "HZ"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row("BJ", 3L) :: Row("HZ", 2L) :: Row("SH", 
1L) :: Nil)
+
+            inputData.addData((7, "HZ"), (8, "SH"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row("BJ", 3L) :: Row("HZ", 3L) :: Row("SH", 
2L) :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
+  test("Paimon Sink: update mode") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and sink into it in update mode
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val inputData = MemoryStream[(Int, String)]
+          intercept[RuntimeException] {
+            inputData
+              .toDF()
+              .writeStream
+              .option("checkpointLocation", checkpointDir.getCanonicalPath)
+              .outputMode("update")
+              .format("paimon")
+              .start(location)
+          }
+      }
+    }
+  }
+
+  test("Paimon Sink: aggregation and watermark") {
+    withTempDir {
+      checkpointDir =>
+        // define an append-only table and sink into it with aggregation and 
watermark in append mode
+        spark.sql(s"""
+                     |CREATE TABLE T (start Timestamp, stockId INT, avg_price 
DOUBLE)
+                     |TBLPROPERTIES ('write-mode'='append-only', 'bucket'='3')
+                     |""".stripMargin)
+        val location = loadTable("T").location().getPath
+
+        val inputData = MemoryStream[(Long, Int, Double)]
+        val data = inputData.toDS
+          .toDF("time", "stockId", "price")
+          .selectExpr("CAST(time AS timestamp) AS timestamp", "stockId", 
"price")
+          .withWatermark("timestamp", "10 seconds")
+          .groupBy(window($"timestamp", "5 seconds"), col("stockId"))
+          .agg(mean("price").as("avg_price"))
+          .select("window.start", "stockId", "avg_price")
+
+        val stream =
+          data.writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .format("paimon")
+            .start(location)
+
+        val query = () =>
+          spark.sql(
+            "SELECT CAST(start as BIGINT) AS start, stockId, avg_price FROM T 
ORDER BY start, stockId")
+
+        try {
+          inputData.addData((101L, 1, 1.0d), (102, 1, 2.0d), (104, 2, 20.0d))
+          stream.processAllAvailable()
+          inputData.addData((105L, 2, 40.0d), (107, 2, 60.0d), (115, 3, 
300.0d))
+          stream.processAllAvailable()
+          inputData.addData((200L, 99, 99.9d))
+          stream.processAllAvailable()
+          checkAnswer(
+            query(),
+            Row(100L, 1, 1.5d) :: Row(100L, 2, 20.0d) :: Row(105L, 2, 50.0d) 
:: Row(
+              115L,
+              3,
+              300.0d) :: Nil)
+        } finally {
+          if (stream != null) {
+            stream.stop()
+          }
+        }
+    }
+  }
+
+  test("Paimon Sink: enable schema evolution") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and sink into it with schema evolution 
in append mode
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val date = Date.valueOf("2023-08-10")
+          spark.sql("INSERT INTO T VALUES (1, '2023-08-09'), (2, 
'2023-08-09')")
+          checkAnswer(
+            spark.sql("SELECT * FROM T ORDER BY a, b"),
+            Row(1, "2023-08-09") :: Row(2, "2023-08-09") :: Nil)
+
+          val inputData = MemoryStream[(Long, Date, Int)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b", "c")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .option("write.merge-schema", "true")
+            .option("write.merge-schema.explicit-cast", "true")
+            .format("paimon")
+            .start(location)
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            inputData.addData((1L, date, 123), (3L, date, 456))
+            stream.processAllAvailable()
+
+            checkAnswer(
+              query(),
+              Row(1L, date, 123) :: Row(2L, Date.valueOf("2023-08-09"), null) 
:: Row(
+                3L,
+                date,
+                456) :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
new file mode 100644
index 000000000..023a03b2f
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, 
Identifier}
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.catalog.Catalogs
+import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
+import org.apache.paimon.table.AbstractFileStoreTable
+
+import org.apache.spark.paimon.Utils
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSparkSession
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import java.io.File
+
+class PaimonSparkTestBase extends QueryTest with SharedSparkSession {
+
+  protected lazy val tempDBDir: File = Utils.createTempDir
+
+  protected lazy val catalog: Catalog = initCatalog()
+
+  protected val dbName0: String = "test"
+
+  protected val tableName0: String = "T"
+
+  override protected def sparkConf = {
+    super.sparkConf
+      .set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
+      .set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
+      .set("spark.sql.extensions", 
classOf[PaimonSparkSessionExtensions].getName)
+  }
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.sql(s"CREATE DATABASE paimon.$dbName0")
+    spark.sql(s"USE paimon.$dbName0")
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      spark.sql("USE default")
+      spark.sql(s"DROP DATABASE paimon.$dbName0 CASCADE")
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  override protected def beforeEach(): Unit = {
+    super.beforeAll()
+    spark.sql(s"DROP TABLE IF EXISTS $tableName0")
+  }
+
+  protected def withTempDirs(f: (File, File) => Unit): Unit = {
+    withTempDir(file1 => withTempDir(file2 => f(file1, file2)))
+  }
+
+  override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
+      pos: Position): Unit = {
+    println(testName)
+    super.test(testName, testTags: _*)(testFun)(pos)
+  }
+
+  private def initCatalog(): Catalog = {
+    val currentCatalog = 
spark.sessionState.catalogManager.currentCatalog.name()
+    val options = Catalogs.catalogOptions(currentCatalog, 
spark.sessionState.conf)
+    val catalogContext =
+      CatalogContext.create(Options.fromMap(options), 
spark.sessionState.newHadoopConf());
+    CatalogFactory.createCatalog(catalogContext);
+  }
+
+  def loadTable(tableName: String): AbstractFileStoreTable = {
+    catalog.getTable(Identifier.create(dbName0, 
tableName)).asInstanceOf[AbstractFileStoreTable]
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
new file mode 100644
index 000000000..eae7a832d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with 
StreamTest {
+
+  import testImplicits._
+
+  test("Paimon Procedure: create and delete tag") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and test `forEachBatch` api
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            // snapshot-1
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            // snapshot-2
+            inputData.addData((2, "b"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            // snapshot-3
+            inputData.addData((2, "b2"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+            checkAnswer(
+              spark.sql("CALL create_tag(table => 'test.T', tag => 'test_tag', 
snapshot => 2)"),
+              Row(true) :: Nil)
+            checkAnswer(
+              spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
+              Row("test_tag") :: Nil)
+            checkAnswer(
+              spark.sql("CALL delete_tag(table => 'test.T', tag => 
'test_tag')"),
+              Row(true) :: Nil)
+            checkAnswer(spark.sql("SELECT tag_name FROM 
paimon.test.`T$tags`"), Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
new file mode 100644
index 000000000..ace04f894
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
+
+  import testImplicits._
+
+  test("Paimon Procedure: rollback to snapshot and tag") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and test `forEachBatch` api
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            // snapshot-1
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            checkAnswer(
+              spark.sql("CALL create_tag(table => 'test.T', tag => 'test_tag', 
snapshot => 1)"),
+              Row(true) :: Nil)
+
+            // snapshot-2
+            inputData.addData((2, "b"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            // snapshot-3
+            inputData.addData((2, "b2"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+            assertThrows[RuntimeException] {
+              spark.sql("CALL rollback(table => 'test.T_exception', version => 
 '2')")
+            }
+            // rollback to snapshot
+            checkAnswer(
+              spark.sql("CALL rollback(table => 'test.T', version => '2')"),
+              Row(true) :: Nil)
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            // rollback to tag
+            checkAnswer(
+              spark.sql("CALL rollback(table => 'test.T', version => 
'test_tag')"),
+              Row(true) :: Nil)
+            checkAnswer(query(), Row(1, "a") :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/spark/paimon/Utils.scala
similarity index 56%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
copy to 
paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/spark/paimon/Utils.scala
index 1fade4a9f..974bbf0c7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/spark/paimon/Utils.scala
@@ -15,19 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.spark.paimon
 
-import org.apache.paimon.spark.procedure.Procedure
+import org.apache.spark.util.{Utils => SparkUtils}
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.util.truncatedString
+import java.io.File
 
-/** A CALL command that resolves stored procedure from SQL. */
-case class CallCommand(procedure: Procedure, args: Seq[Expression]) extends 
LeafCommand {
+/**
+ * A wrapper that some Objects or Classes is limited to access beyond 
[[org.apache.spark]] package.
+ */
+object Utils {
 
-  override lazy val output: Seq[Attribute] = procedure.outputType.toAttributes
+  def createTempDir: File = SparkUtils.createTempDir()
 
-  override def simpleString(maxFields: Int): String = {
-    s"Call${truncatedString(output, "[", ", ", "]", maxFields)} 
${procedure.description}"
-  }
 }
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml 
b/paimon-spark/paimon-spark-3.5/pom.xml
similarity index 95%
copy from paimon-spark/paimon-spark-3.3/pom.xml
copy to paimon-spark/paimon-spark-3.5/pom.xml
index 04766914f..60687670a 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.5/pom.xml
@@ -28,11 +28,11 @@ under the License.
         <version>0.6-SNAPSHOT</version>
     </parent>
 
-    <artifactId>paimon-spark-3.3</artifactId>
-    <name>Paimon : Spark : 3.3</name>
+    <artifactId>paimon-spark-3.5</artifactId>
+    <name>Paimon : Spark : 3.5</name>
 
     <properties>
-        <spark.version>3.3.2</spark.version>
+        <spark.version>3.5.0</spark.version>
     </properties>
 
     <dependencies>
diff --git a/paimon-spark/paimon-spark-common/pom.xml 
b/paimon-spark/paimon-spark-common/pom.xml
index 05975ede9..a7491df29 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
     <name>Paimon : Spark : Common</name>
 
     <properties>
-        <spark.version>3.4.0</spark.version>
+        <spark.version>3.5.0</spark.version>
     </properties>
 
     <dependencies>
@@ -299,6 +299,30 @@ under the License.
                     <sourceDirectory>src/main/antlr4</sourceDirectory>
                 </configuration>
             </plugin>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <version>${scalatest-maven-plugin.version}</version>
+                <configuration>
+                    
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <argLine>-ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g 
-XX:ReservedCodeCacheSize=${CodeCacheSize} ${extraJavaTestArgs} 
-Dio.netty.tryReflectionSetAccessible=true</argLine>
+                    <filereports>PaimonTestSuite.txt</filereports>
+                    <!--
+                      the plugin does not support always now, we must be 
careful w/ singletons.
+                      tracking at: 
https://github.com/scalatest/scalatest-maven-plugin/issues/99
+                      -->
+                    <forkMode>once</forkMode>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index b04b6f021..35167c6af 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.index.PartitionIndex
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite, 
SaveMode, SparkConnectorOptions, SparkRow}
 import org.apache.paimon.spark.SparkUtils.createIOManager
+import org.apache.paimon.spark.util.EncoderUtils
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.table.sink.{BatchWriteBuilder, 
CommitMessageSerializer, DynamicBucketRow, RowPartitionKeyExtractor}
 import org.apache.paimon.types.RowType
@@ -30,7 +31,8 @@ import org.apache.paimon.types.RowType
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.encoders.RowEncoder.encoderFor
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.functions._
@@ -74,13 +76,13 @@ case class WriteIntoPaimonTable(
     val primaryKeyCols = tableSchema.trimmedPrimaryKeys().asScala.map(col)
     val partitionCols = tableSchema.partitionKeys().asScala.map(col)
 
-    val dataEncoder = RowEncoder.apply(data.schema).resolveAndBind()
+    val dataEncoder = EncoderUtils.encode(data.schema).resolveAndBind()
     val originFromRow = dataEncoder.createDeserializer()
 
     // append _bucket_ column as placeholder
     val withBucketCol = data.withColumn(BUCKET_COL, lit(-1))
     val bucketColIdx = withBucketCol.schema.size - 1
-    val withBucketDataEncoder = 
RowEncoder.apply(withBucketCol.schema).resolveAndBind()
+    val withBucketDataEncoder = 
EncoderUtils.encode(withBucketCol.schema).resolveAndBind()
     val toRow = withBucketDataEncoder.createSerializer()
     val fromRow = withBucketDataEncoder.createDeserializer()
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/EncoderUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/EncoderUtils.scala
new file mode 100644
index 000000000..001a14d38
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/EncoderUtils.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.util
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.types.StructType
+
+import scala.reflect.runtime.{universe => ru}
+
+object EncoderUtils {
+
+  private val mirror = ru.runtimeMirror {
+    getClass.getClassLoader
+  }
+
+  lazy val (module, method) = {
+    val expressionEncoder = mirror.reflectModule(
+      
mirror.staticModule("org.apache.spark.sql.catalyst.encoders.ExpressionEncoder"))
+    val term = ru.TermName("apply")
+    val structType = List(List("org.apache.spark.sql.types.StructType"));
+    val method = expressionEncoder.symbol.info
+      .decl(term)
+      .asTerm
+      .alternatives
+      .find(s => s.asMethod.paramLists.map(_.map(_.typeSignature.toString)) == 
structType)
+    if (method.isEmpty) {
+      val rowEncoder =
+        mirror.reflectModule(
+          
mirror.staticModule("org.apache.spark.sql.catalyst.encoders.RowEncoder"))
+      (
+        rowEncoder,
+        rowEncoder.symbol.info
+          .decl(term)
+          .asTerm
+          .alternatives
+          .find(s => 
s.asMethod.paramLists.map(_.map(_.typeSignature.toString)) == structType)
+          .get
+          .asMethod)
+    } else {
+      (expressionEncoder, method.get.asMethod)
+    }
+  }
+
+  def encode(schema: StructType): ExpressionEncoder[Row] = {
+    mirror
+      .reflect(module.instance)
+      .reflectMethod(method)(schema)
+      .asInstanceOf[ExpressionEncoder[Row]]
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
index 1fade4a9f..94777d9be 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
@@ -19,13 +19,15 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.paimon.spark.procedure.Procedure
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.util.truncatedString
 
 /** A CALL command that resolves stored procedure from SQL. */
 case class CallCommand(procedure: Procedure, args: Seq[Expression]) extends 
LeafCommand {
 
-  override lazy val output: Seq[Attribute] = procedure.outputType.toAttributes
+  override lazy val output: Seq[Attribute] =
+    procedure.outputType.map(
+      field => AttributeReference(field.name, field.dataType, field.nullable, 
field.metadata)())
 
   override def simpleString(maxFields: Int): String = {
     s"Call${truncatedString(output, "[", ", ", "]", maxFields)} 
${procedure.description}"
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index cd4425848..f566f2fb7 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -198,7 +198,7 @@ public abstract class SparkReadTestBase {
     // return of 'SHOW CREATE TABLE' excluding TBLPROPERTIES
     protected String showCreateString(String table, String... fieldSpec) {
         return String.format(
-                "CREATE TABLE %s (%s)\n",
+                "CREATE TABLE paimon.default.%s (%s)\n",
                 table,
                 Arrays.stream(fieldSpec).map(s -> "\n  " + 
s).collect(Collectors.joining(",")));
     }
diff --git a/paimon-spark/pom.xml b/paimon-spark/pom.xml
index e8e205ebd..4a948a801 100644
--- a/paimon-spark/pom.xml
+++ b/paimon-spark/pom.xml
@@ -40,6 +40,7 @@ under the License.
     <modules>
         <module>paimon-spark-common</module>
         <module>paimon-spark-2</module>
+        <module>paimon-spark-3.5</module>
         <module>paimon-spark-3.4</module>
         <module>paimon-spark-3.3</module>
         <module>paimon-spark-3.1</module>
diff --git a/pom.xml b/pom.xml
index 97258ec21..b2228085c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,27 @@ under the License.
         <avro.version>1.11.1</avro.version>
         <kafka.version>3.2.3</kafka.version>
         <scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>
+        <scalatest-maven-plugin.version>2.2.0</scalatest-maven-plugin.version>
+        <antlr4-maven-plugin.version>4.8</antlr4-maven-plugin.version>
         <jsoup.version>1.15.3</jsoup.version>
+        <CodeCacheSize>128m</CodeCacheSize>
+        <extraJavaTestArgs>
+            -XX:+IgnoreUnrecognizedVMOptions
+            --add-opens=java.base/java.lang=ALL-UNNAMED
+            --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+            --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+            --add-opens=java.base/java.io=ALL-UNNAMED
+            --add-opens=java.base/java.net=ALL-UNNAMED
+            --add-opens=java.base/java.nio=ALL-UNNAMED
+            --add-opens=java.base/java.util=ALL-UNNAMED
+            --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
+            --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
+            --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+            --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
+            --add-opens=java.base/sun.security.action=ALL-UNNAMED
+            --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
+            -Djdk.reflect.useDirectMethodHandle=false
+        </extraJavaTestArgs>
     </properties>
 
     <dependencies>

Reply via email to