This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9c594cf71 [spark] Support Spark 3.5 (#2019)
9c594cf71 is described below
commit 9c594cf713ad06df5d33c012908a946e4aa6d482
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed Oct 25 13:38:42 2023 +0800
[spark] Support Spark 3.5 (#2019)
---
docs/content/engines/overview.md | 2 +-
docs/content/engines/spark3.md | 4 +-
docs/content/project/download.md | 2 +
paimon-spark/paimon-spark-3.2/pom.xml | 2 +-
paimon-spark/paimon-spark-3.3/pom.xml | 2 +-
paimon-spark/paimon-spark-3.4/pom.xml | 188 +++++++++++++-
.../src/test/resources/log4j2-test.properties | 38 +++
.../org/apache/paimon/spark/PaimonSinkTest.scala | 281 +++++++++++++++++++++
.../apache/paimon/spark/PaimonSparkTestBase.scala | 92 +++++++
.../sql/CreateAndDeleteTagProcedureTest.scala | 86 +++++++
.../paimon/spark/sql/RollbackProcedureTest.scala | 94 +++++++
.../scala/org/apache/spark/paimon/Utils.scala} | 18 +-
.../{paimon-spark-3.3 => paimon-spark-3.5}/pom.xml | 6 +-
paimon-spark/paimon-spark-common/pom.xml | 26 +-
.../spark/commands/WriteIntoPaimonTable.scala | 8 +-
.../apache/paimon/spark/util/EncoderUtils.scala | 66 +++++
.../sql/catalyst/plans/logical/CallCommand.scala | 6 +-
.../org/apache/paimon/spark/SparkReadTestBase.java | 2 +-
paimon-spark/pom.xml | 1 +
pom.xml | 20 ++
20 files changed, 919 insertions(+), 25 deletions(-)
diff --git a/docs/content/engines/overview.md b/docs/content/engines/overview.md
index 3d2cd9e77..30d48c3fc 100644
--- a/docs/content/engines/overview.md
+++ b/docs/content/engines/overview.md
@@ -35,7 +35,7 @@ Apache Spark and Apache Hive.
| Engine | Version | Batch Read | Batch Write | Create Table | Alter
Table | Streaming Write | Streaming Read | Batch Overwrite |
|:------:|:-------------:|:----------:|:-----------:|:------------:|:-----------:|:---------------:|:--------------:|:---------------:|
| Flink | 1.14 - 1.17 | ✅ | ✅ | ✅ | ✅(1.17+)
| ✅ | ✅ | ✅ |
-| Spark | 3.1 - 3.4 | ✅ | ✅ | ✅ | ✅
| ✅ | ✅(3.3+) | ❌ |
+| Spark | 3.1 - 3.5 | ✅ | ✅ | ✅ | ✅
| ✅ | ✅(3.3+) | ❌ |
| Hive | 2.1 - 3.1 | ✅ | ✅ | ✅ | ❌
| ❌ | ❌ | ❌ |
| Spark | 2.4 | ✅ | ❌ | ❌ | ❌
| ❌ | ❌ | ❌ |
| Trino | 358 - 422 | ✅ | ❌ | ✅ | ✅
| ❌ | ❌ | ❌ |
diff --git a/docs/content/engines/spark3.md b/docs/content/engines/spark3.md
index 38320e36c..414679bb7 100644
--- a/docs/content/engines/spark3.md
+++ b/docs/content/engines/spark3.md
@@ -30,7 +30,7 @@ This documentation is a guide for using Paimon in Spark3.
## Preparation
-Paimon currently supports Spark 3.4, 3.3, 3.2 and 3.1. We recommend the latest
Spark version for a better experience.
+Paimon currently supports Spark 3.5, 3.4, 3.3, 3.2 and 3.1. We recommend the
latest Spark version for a better experience.
Download the jar file with corresponding version.
@@ -38,6 +38,7 @@ Download the jar file with corresponding version.
| Version | Jar
|
|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Spark 3.5 | [paimon-spark-3.5-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.5/{{<
version >}}/paimon-spark-3.5-{{< version >}}.jar) |
| Spark 3.4 | [paimon-spark-3.4-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.4/{{<
version >}}/paimon-spark-3.4-{{< version >}}.jar) |
| Spark 3.3 | [paimon-spark-3.3-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.3/{{<
version >}}/paimon-spark-3.3-{{< version >}}.jar) |
| Spark 3.2 | [paimon-spark-3.2-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.2/{{<
version >}}/paimon-spark-3.2-{{< version >}}.jar) |
@@ -49,6 +50,7 @@ Download the jar file with corresponding version.
| Version | Jar
|
|-----------|-------------------------------------------------------------------------------------------------------------------------------------|
+| Spark 3.5 | [paimon-spark-3.5-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.5/{{<
version >}}/) |
| Spark 3.4 | [paimon-spark-3.4-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.4/{{<
version >}}/) |
| Spark 3.3 | [paimon-spark-3.3-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.3/{{<
version >}}/) |
| Spark 3.2 | [paimon-spark-3.2-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.2/{{<
version >}}/) |
diff --git a/docs/content/project/download.md b/docs/content/project/download.md
index 5fc9145eb..0ca56cc91 100644
--- a/docs/content/project/download.md
+++ b/docs/content/project/download.md
@@ -39,6 +39,7 @@ This documentation is a guide for downloading Paimon Jars.
| Flink 1.15 | [paimon-flink-1.15-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.15/{{<
version >}}/) |
| Flink 1.14 | [paimon-flink-1.14-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.14/{{<
version >}}/) |
| Flink Action | [paimon-flink-action-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-action/{{<
version >}}/) |
+| Spark 3.5 | [paimon-spark-3.5-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.5/{{<
version >}}/) |
| Spark 3.4 | [paimon-spark-3.4-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.4/{{<
version >}}/) |
| Spark 3.3 | [paimon-spark-3.3-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.3/{{<
version >}}/) |
| Spark 3.2 | [paimon-spark-3.2-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.2/{{<
version >}}/) |
@@ -72,6 +73,7 @@ This documentation is a guide for downloading Paimon Jars.
| Flink 1.15 | [paimon-flink-1.15-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.15/{{<
version >}}/paimon-flink-1.15-{{< version >}}.jar)
|
| Flink 1.14 | [paimon-flink-1.14-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.14/{{<
version >}}/paimon-flink-1.14-{{< version >}}.jar)
|
| Flink Action | [paimon-flink-action-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-action/{{<
version >}}/paimon-flink-action-{{< version >}}.jar)
|
+| Spark 3.5 | [paimon-spark-3.5-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.5/{{<
version >}}/paimon-spark-3.5-{{< version >}}.jar)
|
| Spark 3.4 | [paimon-spark-3.4-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.4/{{<
version >}}/paimon-spark-3.4-{{< version >}}.jar)
|
| Spark 3.3 | [paimon-spark-3.3-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.3/{{<
version >}}/paimon-spark-3.3-{{< version >}}.jar)
|
| Spark 3.2 | [paimon-spark-3.2-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.2/{{<
version >}}/paimon-spark-3.2-{{< version >}}.jar)
|
diff --git a/paimon-spark/paimon-spark-3.2/pom.xml
b/paimon-spark/paimon-spark-3.2/pom.xml
index 981cdf17d..df6281927 100644
--- a/paimon-spark/paimon-spark-3.2/pom.xml
+++ b/paimon-spark/paimon-spark-3.2/pom.xml
@@ -32,7 +32,7 @@ under the License.
<name>Paimon : Spark : 3.2</name>
<properties>
- <spark.version>3.2.2</spark.version>
+ <spark.version>3.2.4</spark.version>
</properties>
<dependencies>
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml
b/paimon-spark/paimon-spark-3.3/pom.xml
index 04766914f..9cada28fe 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.3/pom.xml
@@ -32,7 +32,7 @@ under the License.
<name>Paimon : Spark : 3.3</name>
<properties>
- <spark.version>3.3.2</spark.version>
+ <spark.version>3.3.3</spark.version>
</properties>
<dependencies>
diff --git a/paimon-spark/paimon-spark-3.4/pom.xml
b/paimon-spark/paimon-spark-3.4/pom.xml
index b25a56273..c2932c418 100644
--- a/paimon-spark/paimon-spark-3.4/pom.xml
+++ b/paimon-spark/paimon-spark-3.4/pom.xml
@@ -32,7 +32,7 @@ under the License.
<name>Paimon : Spark : 3.4</name>
<properties>
- <spark.version>3.4.0</spark.version>
+ <spark.version>3.4.1</spark.version>
</properties>
<dependencies>
@@ -55,11 +55,128 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/
SLF4J1 -->
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.12</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.12</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_2.12</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <version>3.1.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>${scala-maven-plugin.version}</version>
+ <configuration>
+ <args>
+ <arg>-nobootcp</arg>
+ <arg>-target:jvm-${target.java.version}</arg>
+ </args>
+
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -81,6 +198,75 @@ under the License.
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>${scala-maven-plugin.version}</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so
that dependencies on
+ scala classes can be resolved later in the (Java)
compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources
phase, so that dependencies on
+ scala classes can be resolved later in the (Java)
test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>${spotless.version}</version>
+ <configuration>
+ <scala>
+ <scalafmt>
+ <version>3.4.3</version>
+ <!-- This file is in the root of the project to
make sure IntelliJ picks it up automatically -->
+
<file>${project.basedir}/../../.scalafmt.conf</file>
+ </scalafmt>
+ <licenseHeader>
+ <content>${spotless.license.header}</content>
+ <delimiter>${spotless.delimiter}</delimiter>
+ </licenseHeader>
+ </scala>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>${scalatest-maven-plugin.version}</version>
+ <configuration>
+
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <argLine>-ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g
-XX:ReservedCodeCacheSize=${CodeCacheSize} ${extraJavaTestArgs}
-Dio.netty.tryReflectionSetAccessible=true</argLine>
+ <filereports>PaimonTestSuite.txt</filereports>
+ <!--
+ the plugin does not support always now, we must be
careful w/ singletons.
+ tracking at:
https://github.com/scalatest/scalatest-maven-plugin/issues/99
+ -->
+ <forkMode>once</forkMode>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/resources/log4j2-test.properties
b/paimon-spark/paimon-spark-3.4/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..6f324f586
--- /dev/null
+++ b/paimon-spark/paimon-spark-3.4/src/test/resources/log4j2-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
+
+logger.kafka.name = kafka
+logger.kafka.level = OFF
+logger.kafka2.name = state.change
+logger.kafka2.level = OFF
+
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = OFF
+logger.I0Itec.name = org.I0Itec
+logger.I0Itec.level = OFF
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
new file mode 100644
index 000000000..36df428a5
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.{col, mean, window}
+import org.apache.spark.sql.streaming.StreamTest
+
+import java.sql.Date
+
+class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
+
+ import testImplicits._
+
+ test("Paimon Sink: forEachBatch") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and test `forEachBatch` api
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], id: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ inputData.addData((2, "b2"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+
+ test("Paimon Sink: append mode") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and sink into it in append mode
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("paimon")
+ .start(location)
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ inputData.addData((2, "b2"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+
+ test("Paimon Sink: complete mode") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define an append-only table and sink into it in complete mode
+ spark.sql(s"""
+ |CREATE TABLE T (city String, population Long)
+ |TBLPROPERTIES ('write-mode'='append-only',
'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData.toDS
+ .toDF("uid", "city")
+ .groupBy("city")
+ .count()
+ .toDF("city", "population")
+ .writeStream
+ .outputMode("complete")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("paimon")
+ .start(location)
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY city")
+
+ try {
+ inputData.addData((1, "HZ"), (2, "BJ"), (3, "BJ"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row("BJ", 2L) :: Row("HZ", 1L) :: Nil)
+
+ inputData.addData((4, "SH"), (5, "BJ"), (6, "HZ"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row("BJ", 3L) :: Row("HZ", 2L) :: Row("SH",
1L) :: Nil)
+
+ inputData.addData((7, "HZ"), (8, "SH"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row("BJ", 3L) :: Row("HZ", 3L) :: Row("SH",
2L) :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+
+ test("Paimon Sink: update mode") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and sink into it in update mode
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Int, String)]
+ intercept[RuntimeException] {
+ inputData
+ .toDF()
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .outputMode("update")
+ .format("paimon")
+ .start(location)
+ }
+ }
+ }
+ }
+
+ test("Paimon Sink: aggregation and watermark") {
+ withTempDir {
+ checkpointDir =>
+ // define an append-only table and sink into it with aggregation and
watermark in append mode
+ spark.sql(s"""
+ |CREATE TABLE T (start Timestamp, stockId INT, avg_price
DOUBLE)
+ |TBLPROPERTIES ('write-mode'='append-only', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Long, Int, Double)]
+ val data = inputData.toDS
+ .toDF("time", "stockId", "price")
+ .selectExpr("CAST(time AS timestamp) AS timestamp", "stockId",
"price")
+ .withWatermark("timestamp", "10 seconds")
+ .groupBy(window($"timestamp", "5 seconds"), col("stockId"))
+ .agg(mean("price").as("avg_price"))
+ .select("window.start", "stockId", "avg_price")
+
+ val stream =
+ data.writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("paimon")
+ .start(location)
+
+ val query = () =>
+ spark.sql(
+ "SELECT CAST(start as BIGINT) AS start, stockId, avg_price FROM T
ORDER BY start, stockId")
+
+ try {
+ inputData.addData((101L, 1, 1.0d), (102, 1, 2.0d), (104, 2, 20.0d))
+ stream.processAllAvailable()
+ inputData.addData((105L, 2, 40.0d), (107, 2, 60.0d), (115, 3,
300.0d))
+ stream.processAllAvailable()
+ inputData.addData((200L, 99, 99.9d))
+ stream.processAllAvailable()
+ checkAnswer(
+ query(),
+ Row(100L, 1, 1.5d) :: Row(100L, 2, 20.0d) :: Row(105L, 2, 50.0d)
:: Row(
+ 115L,
+ 3,
+ 300.0d) :: Nil)
+ } finally {
+ if (stream != null) {
+ stream.stop()
+ }
+ }
+ }
+ }
+
+ test("Paimon Sink: enable schema evolution") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and sink into it with schema evolution
in append mode
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val date = Date.valueOf("2023-08-10")
+ spark.sql("INSERT INTO T VALUES (1, '2023-08-09'), (2,
'2023-08-09')")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, "2023-08-09") :: Row(2, "2023-08-09") :: Nil)
+
+ val inputData = MemoryStream[(Long, Date, Int)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b", "c")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .option("write.merge-schema", "true")
+ .option("write.merge-schema.explicit-cast", "true")
+ .format("paimon")
+ .start(location)
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ inputData.addData((1L, date, 123), (3L, date, 456))
+ stream.processAllAvailable()
+
+ checkAnswer(
+ query(),
+ Row(1L, date, 123) :: Row(2L, Date.valueOf("2023-08-09"), null)
:: Row(
+ 3L,
+ date,
+ 456) :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
new file mode 100644
index 000000000..023a03b2f
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory,
Identifier}
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.catalog.Catalogs
+import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
+import org.apache.paimon.table.AbstractFileStoreTable
+
+import org.apache.spark.paimon.Utils
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSparkSession
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import java.io.File
+
+class PaimonSparkTestBase extends QueryTest with SharedSparkSession {
+
+ protected lazy val tempDBDir: File = Utils.createTempDir
+
+ protected lazy val catalog: Catalog = initCatalog()
+
+ protected val dbName0: String = "test"
+
+ protected val tableName0: String = "T"
+
+ override protected def sparkConf = {
+ super.sparkConf
+ .set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
+ .set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
+ .set("spark.sql.extensions",
classOf[PaimonSparkSessionExtensions].getName)
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.sql(s"CREATE DATABASE paimon.$dbName0")
+ spark.sql(s"USE paimon.$dbName0")
+ }
+
+ override protected def afterAll(): Unit = {
+ try {
+ spark.sql("USE default")
+ spark.sql(s"DROP DATABASE paimon.$dbName0 CASCADE")
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ override protected def beforeEach(): Unit = {
+ super.beforeAll()
+ spark.sql(s"DROP TABLE IF EXISTS $tableName0")
+ }
+
+ protected def withTempDirs(f: (File, File) => Unit): Unit = {
+ withTempDir(file1 => withTempDir(file2 => f(file1, file2)))
+ }
+
+ override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
+ pos: Position): Unit = {
+ println(testName)
+ super.test(testName, testTags: _*)(testFun)(pos)
+ }
+
+ private def initCatalog(): Catalog = {
+ val currentCatalog =
spark.sessionState.catalogManager.currentCatalog.name()
+ val options = Catalogs.catalogOptions(currentCatalog,
spark.sessionState.conf)
+ val catalogContext =
+ CatalogContext.create(Options.fromMap(options),
spark.sessionState.newHadoopConf());
+ CatalogFactory.createCatalog(catalogContext);
+ }
+
+ def loadTable(tableName: String): AbstractFileStoreTable = {
+ catalog.getTable(Identifier.create(dbName0,
tableName)).asInstanceOf[AbstractFileStoreTable]
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
new file mode 100644
index 000000000..eae7a832d
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with
StreamTest {
+
+ import testImplicits._
+
+ test("Paimon Procedure: create and delete tag") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and test `forEachBatch` api
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ // snapshot-1
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ // snapshot-2
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ // snapshot-3
+ inputData.addData((2, "b2"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+ checkAnswer(
+ spark.sql("CALL create_tag(table => 'test.T', tag => 'test_tag',
snapshot => 2)"),
+ Row(true) :: Nil)
+ checkAnswer(
+ spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
+ Row("test_tag") :: Nil)
+ checkAnswer(
+ spark.sql("CALL delete_tag(table => 'test.T', tag =>
'test_tag')"),
+ Row(true) :: Nil)
+ checkAnswer(spark.sql("SELECT tag_name FROM
paimon.test.`T$tags`"), Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
new file mode 100644
index 000000000..ace04f894
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
+
+ import testImplicits._
+
+ test("Paimon Procedure: rollback to snapshot and tag") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and test `forEachBatch` api
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ // snapshot-1
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ checkAnswer(
+ spark.sql("CALL create_tag(table => 'test.T', tag => 'test_tag',
snapshot => 1)"),
+ Row(true) :: Nil)
+
+ // snapshot-2
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ // snapshot-3
+ inputData.addData((2, "b2"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+ assertThrows[RuntimeException] {
+ spark.sql("CALL rollback(table => 'test.T_exception', version =>
'2')")
+ }
+ // rollback to snapshot
+ checkAnswer(
+ spark.sql("CALL rollback(table => 'test.T', version => '2')"),
+ Row(true) :: Nil)
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ // rollback to tag
+ checkAnswer(
+ spark.sql("CALL rollback(table => 'test.T', version =>
'test_tag')"),
+ Row(true) :: Nil)
+ checkAnswer(query(), Row(1, "a") :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/spark/paimon/Utils.scala
similarity index 56%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
copy to
paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/spark/paimon/Utils.scala
index 1fade4a9f..974bbf0c7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/spark/paimon/Utils.scala
@@ -15,19 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.spark.paimon
-import org.apache.paimon.spark.procedure.Procedure
+import org.apache.spark.util.{Utils => SparkUtils}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.util.truncatedString
+import java.io.File
-/** A CALL command that resolves stored procedure from SQL. */
-case class CallCommand(procedure: Procedure, args: Seq[Expression]) extends
LeafCommand {
+/**
+ * A wrapper that some Objects or Classes is limited to access beyond
[[org.apache.spark]] package.
+ */
+object Utils {
- override lazy val output: Seq[Attribute] = procedure.outputType.toAttributes
+ def createTempDir: File = SparkUtils.createTempDir()
- override def simpleString(maxFields: Int): String = {
- s"Call${truncatedString(output, "[", ", ", "]", maxFields)}
${procedure.description}"
- }
}
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml
b/paimon-spark/paimon-spark-3.5/pom.xml
similarity index 95%
copy from paimon-spark/paimon-spark-3.3/pom.xml
copy to paimon-spark/paimon-spark-3.5/pom.xml
index 04766914f..60687670a 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.5/pom.xml
@@ -28,11 +28,11 @@ under the License.
<version>0.6-SNAPSHOT</version>
</parent>
- <artifactId>paimon-spark-3.3</artifactId>
- <name>Paimon : Spark : 3.3</name>
+ <artifactId>paimon-spark-3.5</artifactId>
+ <name>Paimon : Spark : 3.5</name>
<properties>
- <spark.version>3.3.2</spark.version>
+ <spark.version>3.5.0</spark.version>
</properties>
<dependencies>
diff --git a/paimon-spark/paimon-spark-common/pom.xml
b/paimon-spark/paimon-spark-common/pom.xml
index 05975ede9..a7491df29 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
<name>Paimon : Spark : Common</name>
<properties>
- <spark.version>3.4.0</spark.version>
+ <spark.version>3.5.0</spark.version>
</properties>
<dependencies>
@@ -299,6 +299,30 @@ under the License.
<sourceDirectory>src/main/antlr4</sourceDirectory>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>${scalatest-maven-plugin.version}</version>
+ <configuration>
+
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <argLine>-ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g
-XX:ReservedCodeCacheSize=${CodeCacheSize} ${extraJavaTestArgs}
-Dio.netty.tryReflectionSetAccessible=true</argLine>
+ <filereports>PaimonTestSuite.txt</filereports>
+ <!--
+ the plugin does not support always now, we must be
careful w/ singletons.
+ tracking at:
https://github.com/scalatest/scalatest-maven-plugin/issues/99
+ -->
+ <forkMode>once</forkMode>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index b04b6f021..35167c6af 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.index.PartitionIndex
import org.apache.paimon.options.Options
import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite,
SaveMode, SparkConnectorOptions, SparkRow}
import org.apache.paimon.spark.SparkUtils.createIOManager
+import org.apache.paimon.spark.util.EncoderUtils
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{BatchWriteBuilder,
CommitMessageSerializer, DynamicBucketRow, RowPartitionKeyExtractor}
import org.apache.paimon.types.RowType
@@ -30,7 +31,8 @@ import org.apache.paimon.types.RowType
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.encoders.RowEncoder.encoderFor
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.functions._
@@ -74,13 +76,13 @@ case class WriteIntoPaimonTable(
val primaryKeyCols = tableSchema.trimmedPrimaryKeys().asScala.map(col)
val partitionCols = tableSchema.partitionKeys().asScala.map(col)
- val dataEncoder = RowEncoder.apply(data.schema).resolveAndBind()
+ val dataEncoder = EncoderUtils.encode(data.schema).resolveAndBind()
val originFromRow = dataEncoder.createDeserializer()
// append _bucket_ column as placeholder
val withBucketCol = data.withColumn(BUCKET_COL, lit(-1))
val bucketColIdx = withBucketCol.schema.size - 1
- val withBucketDataEncoder =
RowEncoder.apply(withBucketCol.schema).resolveAndBind()
+ val withBucketDataEncoder =
EncoderUtils.encode(withBucketCol.schema).resolveAndBind()
val toRow = withBucketDataEncoder.createSerializer()
val fromRow = withBucketDataEncoder.createDeserializer()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/EncoderUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/EncoderUtils.scala
new file mode 100644
index 000000000..001a14d38
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/EncoderUtils.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.util
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.types.StructType
+
+import scala.reflect.runtime.{universe => ru}
+
+object EncoderUtils {
+
+ private val mirror = ru.runtimeMirror {
+ getClass.getClassLoader
+ }
+
+ lazy val (module, method) = {
+ val expressionEncoder = mirror.reflectModule(
+
mirror.staticModule("org.apache.spark.sql.catalyst.encoders.ExpressionEncoder"))
+ val term = ru.TermName("apply")
+ val structType = List(List("org.apache.spark.sql.types.StructType"));
+ val method = expressionEncoder.symbol.info
+ .decl(term)
+ .asTerm
+ .alternatives
+ .find(s => s.asMethod.paramLists.map(_.map(_.typeSignature.toString)) ==
structType)
+ if (method.isEmpty) {
+ val rowEncoder =
+ mirror.reflectModule(
+
mirror.staticModule("org.apache.spark.sql.catalyst.encoders.RowEncoder"))
+ (
+ rowEncoder,
+ rowEncoder.symbol.info
+ .decl(term)
+ .asTerm
+ .alternatives
+ .find(s =>
s.asMethod.paramLists.map(_.map(_.typeSignature.toString)) == structType)
+ .get
+ .asMethod)
+ } else {
+ (expressionEncoder, method.get.asMethod)
+ }
+ }
+
+ def encode(schema: StructType): ExpressionEncoder[Row] = {
+ mirror
+ .reflect(module.instance)
+ .reflectMethod(method)(schema)
+ .asInstanceOf[ExpressionEncoder[Row]]
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
index 1fade4a9f..94777d9be 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CallCommand.scala
@@ -19,13 +19,15 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.paimon.spark.procedure.Procedure
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.util.truncatedString
/** A CALL command that resolves stored procedure from SQL. */
case class CallCommand(procedure: Procedure, args: Seq[Expression]) extends
LeafCommand {
- override lazy val output: Seq[Attribute] = procedure.outputType.toAttributes
+ override lazy val output: Seq[Attribute] =
+ procedure.outputType.map(
+ field => AttributeReference(field.name, field.dataType, field.nullable,
field.metadata)())
override def simpleString(maxFields: Int): String = {
s"Call${truncatedString(output, "[", ", ", "]", maxFields)}
${procedure.description}"
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index cd4425848..f566f2fb7 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -198,7 +198,7 @@ public abstract class SparkReadTestBase {
// return of 'SHOW CREATE TABLE' excluding TBLPROPERTIES
protected String showCreateString(String table, String... fieldSpec) {
return String.format(
- "CREATE TABLE %s (%s)\n",
+ "CREATE TABLE paimon.default.%s (%s)\n",
table,
Arrays.stream(fieldSpec).map(s -> "\n " +
s).collect(Collectors.joining(",")));
}
diff --git a/paimon-spark/pom.xml b/paimon-spark/pom.xml
index e8e205ebd..4a948a801 100644
--- a/paimon-spark/pom.xml
+++ b/paimon-spark/pom.xml
@@ -40,6 +40,7 @@ under the License.
<modules>
<module>paimon-spark-common</module>
<module>paimon-spark-2</module>
+ <module>paimon-spark-3.5</module>
<module>paimon-spark-3.4</module>
<module>paimon-spark-3.3</module>
<module>paimon-spark-3.1</module>
diff --git a/pom.xml b/pom.xml
index 97258ec21..b2228085c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,27 @@ under the License.
<avro.version>1.11.1</avro.version>
<kafka.version>3.2.3</kafka.version>
<scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>
+ <scalatest-maven-plugin.version>2.2.0</scalatest-maven-plugin.version>
+ <antlr4-maven-plugin.version>4.8</antlr4-maven-plugin.version>
<jsoup.version>1.15.3</jsoup.version>
+ <CodeCacheSize>128m</CodeCacheSize>
+ <extraJavaTestArgs>
+ -XX:+IgnoreUnrecognizedVMOptions
+ --add-opens=java.base/java.lang=ALL-UNNAMED
+ --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+ --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+ --add-opens=java.base/java.io=ALL-UNNAMED
+ --add-opens=java.base/java.net=ALL-UNNAMED
+ --add-opens=java.base/java.nio=ALL-UNNAMED
+ --add-opens=java.base/java.util=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
+ --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+ --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
+ --add-opens=java.base/sun.security.action=ALL-UNNAMED
+ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
+ -Djdk.reflect.useDirectMethodHandle=false
+ </extraJavaTestArgs>
</properties>
<dependencies>