This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 77fd8ecf [FLINK-31302] Split spark modules according to version
77fd8ecf is described below
commit 77fd8ecfd599c9d6c2b706bee4684558a8ba2f40
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 3 15:45:58 2023 +0800
[FLINK-31302] Split spark modules according to version
This closes #570
---
docs/content/docs/engines/spark2.md | 2 +-
docs/content/docs/engines/spark3.md | 4 +-
.../table/store/file/schema/SchemaManagerTest.java | 16 +-
flink-table-store-e2e-tests/pom.xml | 4 +-
.../flink/table/store/tests/SparkE2eTest.java | 6 +
.../flink-table-store-spark-2}/pom.xml | 21 +-
.../table/store/spark/SparkDataSourceReader.java | 0
.../table/store/spark/SparkInputPartition.java | 0
.../flink/table/store/spark/SparkSource.java | 0
...org.apache.spark.sql.sources.DataSourceRegister | 0
.../table/store/spark/SimpleTableTestHelper.java | 0
.../store/spark/SparkFilterConverterTest.java | 0
.../table/store/spark/SparkInternalRowTest.java | 0
.../flink/table/store/spark/SparkReadITCase.java | 0
.../flink/table/store/spark/SparkTypeTest.java | 0
.../src/test/resources/log4j2-test.properties | 0
.../flink-table-store-spark-3.1}/pom.xml | 42 +---
.../flink-table-store-spark-3.2}/pom.xml | 42 +---
.../flink-table-store-spark-3.3}/pom.xml | 42 +---
.../flink-table-store-spark-common}/pom.xml | 46 +---
.../flink/table/store/spark/SparkArrayData.java | 0
.../flink/table/store/spark/SparkCatalog.java | 2 +-
.../table/store/spark/SparkFilterConverter.java | 0
.../table/store/spark/SparkInputPartition.java | 0
.../flink/table/store/spark/SparkInternalRow.java | 3 +-
.../table/store/spark/SparkReaderFactory.java | 0
.../apache/flink/table/store/spark/SparkRow.java | 0
.../apache/flink/table/store/spark/SparkScan.java | 0
.../flink/table/store/spark/SparkScanBuilder.java | 0
.../flink/table/store/spark/SparkSource.java | 0
.../apache/flink/table/store/spark/SparkTable.java | 0
.../flink/table/store/spark/SparkTypeUtils.java | 0
.../apache/flink/table/store/spark/SparkWrite.java | 0
.../flink/table/store/spark/SparkWriteBuilder.java | 0
.../store/spark/SpecializedGettersReader.java | 7 +-
...org.apache.spark.sql.sources.DataSourceRegister | 0
.../table/store/spark/MinioTestContainer.java | 0
.../store/spark/SparkFilterConverterTest.java | 0
.../table/store/spark/SparkInternalRowTest.java | 0
.../flink/table/store/spark/SparkReadITCase.java | 0
.../flink/table/store/spark/SparkReadTestBase.java | 22 +-
.../flink/table/store/spark/SparkS3ITCase.java | 9 +
.../store/spark/SparkSchemaEvolutionITCase.java | 0
.../flink/table/store/spark/SparkTypeTest.java | 0
.../flink/table/store/spark/SparkWriteITCase.java | 17 +-
.../src/test/resources/log4j2-test.properties | 0
flink-table-store-spark/pom.xml | 94 +-------
.../flink/table/store/spark/SparkArrayData.java | 171 ---------------
.../table/store/spark/SparkFilterConverter.java | 128 -----------
.../flink/table/store/spark/SparkInternalRow.java | 241 ---------------------
.../flink/table/store/spark/SparkTypeUtils.java | 185 ----------------
pom.xml | 1 -
52 files changed, 114 insertions(+), 991 deletions(-)
diff --git a/docs/content/docs/engines/spark2.md
b/docs/content/docs/engines/spark2.md
index bce84516..318fbbbd 100644
--- a/docs/content/docs/engines/spark2.md
+++ b/docs/content/docs/engines/spark2.md
@@ -56,7 +56,7 @@ Build bundled jar with the following command.
mvn clean install -DskipTests
```
-You can find the bundled jar in
`./flink-table-store-spark2/target/flink-table-store-spark2-{{< version
>}}.jar`.
+You can find the bundled jar in
`./flink-table-store-spark/flink-table-store-spark2/target/flink-table-store-spark2-{{<
version >}}.jar`.
## Quick Start
diff --git a/docs/content/docs/engines/spark3.md
b/docs/content/docs/engines/spark3.md
index ce5fa814..a44263bd 100644
--- a/docs/content/docs/engines/spark3.md
+++ b/docs/content/docs/engines/spark3.md
@@ -32,6 +32,8 @@ This documentation is a guide for using Table Store in Spark3.
{{< stable >}}
+Table Store currently supports Spark 3.3, 3.2 and 3.1. We recommend the latest
Spark version for a better experience.
+
Download [flink-table-store-spark-{{< version
>}}.jar](https://www.apache.org/dyn/closer.lua/flink/flink-table-store-{{<
version >}}/flink-table-store-spark-{{< version >}}.jar).
You can also manually build bundled jar from the source code.
@@ -52,7 +54,7 @@ Build bundled jar with the following command.
mvn clean install -DskipTests
```
-You can find the bundled jar in
`./flink-table-store-spark/target/flink-table-store-spark-{{< version >}}.jar`.
+For Spark 3.3, you can find the bundled jar in
`./flink-table-store-spark/flink-table-store-spark-3.3/target/flink-table-store-spark-3.3-{{<
version >}}.jar`.
## Quick Start
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index e66bea70..6da278f5 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -147,13 +147,15 @@ public class SchemaManagerTest {
@Test
public void testConcurrentCommit() throws Exception {
- manager.createTable(
- new Schema(
- rowType.getFields(),
- partitionKeys,
- primaryKeys,
- Collections.singletonMap("id", "-1"),
- "my_comment_4"));
+ retryArtificialException(
+ () ->
+ manager.createTable(
+ new Schema(
+ rowType.getFields(),
+ partitionKeys,
+ primaryKeys,
+ Collections.singletonMap("id", "-1"),
+ "my_comment_4")));
int threadNumber = ThreadLocalRandom.current().nextInt(3) + 2;
List<Thread> threads = new ArrayList<>();
diff --git a/flink-table-store-e2e-tests/pom.xml
b/flink-table-store-e2e-tests/pom.xml
index 1580f006..112b4fda 100644
--- a/flink-table-store-e2e-tests/pom.xml
+++ b/flink-table-store-e2e-tests/pom.xml
@@ -62,7 +62,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-spark</artifactId>
+ <artifactId>flink-table-store-spark-3.2</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
@@ -132,7 +132,7 @@ under the License.
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-spark</artifactId>
+
<artifactId>flink-table-store-spark-3.2</artifactId>
<version>${project.version}</version>
<destFileName>flink-table-store-spark.jar</destFileName>
<type>jar</type>
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/SparkE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/SparkE2eTest.java
index f372d25b..3761b837 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/SparkE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/SparkE2eTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.tests;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.ContainerState;
@@ -31,6 +33,8 @@ import java.util.stream.Collectors;
@DisabledIfSystemProperty(named = "test.flink.version", matches = "1.14.*")
public class SparkE2eTest extends E2eReaderTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkE2eTest.class);
+
public SparkE2eTest() {
super(false, false, true);
}
@@ -63,6 +67,8 @@ public class SparkE2eTest extends E2eReaderTestBase {
"-f",
TEST_DATA_DIR + "/" + sql);
if (execResult.getExitCode() != 0) {
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
throw new AssertionError("Failed when running spark
sql.");
}
return Arrays.stream(execResult.getStdout().split("\n"))
diff --git a/flink-table-store-spark2/pom.xml
b/flink-table-store-spark/flink-table-store-spark-2/pom.xml
similarity index 88%
copy from flink-table-store-spark2/pom.xml
copy to flink-table-store-spark/flink-table-store-spark-2/pom.xml
index e9d129d4..7207d74f 100644
--- a/flink-table-store-spark2/pom.xml
+++ b/flink-table-store-spark/flink-table-store-spark-2/pom.xml
@@ -23,15 +23,13 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>flink-table-store-parent</artifactId>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-spark</artifactId>
<version>0.4-SNAPSHOT</version>
</parent>
- <artifactId>flink-table-store-spark2</artifactId>
- <name>Flink Table Store : Spark2</name>
-
- <packaging>jar</packaging>
+ <artifactId>flink-table-store-spark-2</artifactId>
+ <name>Flink Table Store : Spark : 2</name>
<properties>
<spark2.version>2.4.8</spark2.version>
@@ -39,11 +37,16 @@ under the License.
</properties>
<dependencies>
- <!-- Flink All dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-shade</artifactId>
+ <artifactId>flink-table-store-spark-common</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -98,7 +101,7 @@ under the License.
<configuration>
<artifactSet>
<includes combine.children="append">
-
<include>org.apache.flink:flink-table-store-shade</include>
+
<include>org.apache.flink:flink-table-store-spark-common</include>
</includes>
</artifactSet>
</configuration>
@@ -107,4 +110,4 @@ under the License.
</plugin>
</plugins>
</build>
-</project>
+</project>
\ No newline at end of file
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
b/flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
similarity index 100%
rename from
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
rename to
flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
b/flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
similarity index 100%
rename from
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
rename to
flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
b/flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
similarity index 100%
rename from
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
rename to
flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
diff --git
a/flink-table-store-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/flink-table-store-spark/flink-table-store-spark-2/src/main/resources/META-INF.services/org.apache.spark.sql.sources.DataSourceRegister
similarity index 100%
rename from
flink-table-store-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
rename to
flink-table-store-spark/flink-table-store-spark-2/src/main/resources/META-INF.services/org.apache.spark.sql.sources.DataSourceRegister
diff --git
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
b/flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
similarity index 100%
rename from
flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
rename to
flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
b/flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
similarity index 100%
rename from
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
rename to
flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
diff --git
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
b/flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
similarity index 100%
rename from
flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
rename to
flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
diff --git
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
b/flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
similarity index 100%
rename from
flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
rename to
flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
diff --git
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
b/flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
similarity index 100%
rename from
flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
rename to
flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
diff --git a/flink-table-store-spark/src/test/resources/log4j2-test.properties
b/flink-table-store-spark/flink-table-store-spark-2/src/test/resources/log4j2-test.properties
similarity index 100%
rename from flink-table-store-spark/src/test/resources/log4j2-test.properties
rename to
flink-table-store-spark/flink-table-store-spark-2/src/test/resources/log4j2-test.properties
diff --git a/flink-table-store-spark2/pom.xml
b/flink-table-store-spark/flink-table-store-spark-3.1/pom.xml
similarity index 66%
copy from flink-table-store-spark2/pom.xml
copy to flink-table-store-spark/flink-table-store-spark-3.1/pom.xml
index e9d129d4..45fe23bf 100644
--- a/flink-table-store-spark2/pom.xml
+++ b/flink-table-store-spark/flink-table-store-spark-3.1/pom.xml
@@ -23,47 +23,30 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>flink-table-store-parent</artifactId>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-spark</artifactId>
<version>0.4-SNAPSHOT</version>
</parent>
- <artifactId>flink-table-store-spark2</artifactId>
- <name>Flink Table Store : Spark2</name>
-
- <packaging>jar</packaging>
+ <artifactId>flink-table-store-spark-3.1</artifactId>
+ <name>Flink Table Store : Spark : 3.1</name>
<properties>
- <spark2.version>2.4.8</spark2.version>
- <jackson.version>2.13.3</jackson.version>
+ <spark.version>3.1.3</spark.version>
</properties>
<dependencies>
- <!-- Flink All dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-shade</artifactId>
+ <artifactId>flink-table-store-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>${spark2.version}</version>
+ <artifactId>spark-sql_2.12</artifactId>
+ <version>${spark.version}</version>
<exclusions>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <!-- The Jackson dependencies of Spark 2.4.8 have
vulnerabilities. -->
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>*</artifactId>
- </exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
@@ -74,13 +57,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
- <version>${jackson.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
@@ -98,7 +74,7 @@ under the License.
<configuration>
<artifactSet>
<includes combine.children="append">
-
<include>org.apache.flink:flink-table-store-shade</include>
+
<include>org.apache.flink:flink-table-store-spark-common</include>
</includes>
</artifactSet>
</configuration>
@@ -107,4 +83,4 @@ under the License.
</plugin>
</plugins>
</build>
-</project>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-spark2/pom.xml
b/flink-table-store-spark/flink-table-store-spark-3.2/pom.xml
similarity index 66%
copy from flink-table-store-spark2/pom.xml
copy to flink-table-store-spark/flink-table-store-spark-3.2/pom.xml
index e9d129d4..4f2610c1 100644
--- a/flink-table-store-spark2/pom.xml
+++ b/flink-table-store-spark/flink-table-store-spark-3.2/pom.xml
@@ -23,47 +23,30 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>flink-table-store-parent</artifactId>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-spark</artifactId>
<version>0.4-SNAPSHOT</version>
</parent>
- <artifactId>flink-table-store-spark2</artifactId>
- <name>Flink Table Store : Spark2</name>
-
- <packaging>jar</packaging>
+ <artifactId>flink-table-store-spark-3.2</artifactId>
+ <name>Flink Table Store : Spark : 3.2</name>
<properties>
- <spark2.version>2.4.8</spark2.version>
- <jackson.version>2.13.3</jackson.version>
+ <spark.version>3.2.2</spark.version>
</properties>
<dependencies>
- <!-- Flink All dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-shade</artifactId>
+ <artifactId>flink-table-store-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>${spark2.version}</version>
+ <artifactId>spark-sql_2.12</artifactId>
+ <version>${spark.version}</version>
<exclusions>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <!-- The Jackson dependencies of Spark 2.4.8 have
vulnerabilities. -->
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>*</artifactId>
- </exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
@@ -74,13 +57,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
- <version>${jackson.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
@@ -98,7 +74,7 @@ under the License.
<configuration>
<artifactSet>
<includes combine.children="append">
-
<include>org.apache.flink:flink-table-store-shade</include>
+
<include>org.apache.flink:flink-table-store-spark-common</include>
</includes>
</artifactSet>
</configuration>
@@ -107,4 +83,4 @@ under the License.
</plugin>
</plugins>
</build>
-</project>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-spark2/pom.xml
b/flink-table-store-spark/flink-table-store-spark-3.3/pom.xml
similarity index 66%
copy from flink-table-store-spark2/pom.xml
copy to flink-table-store-spark/flink-table-store-spark-3.3/pom.xml
index e9d129d4..a03ebd1d 100644
--- a/flink-table-store-spark2/pom.xml
+++ b/flink-table-store-spark/flink-table-store-spark-3.3/pom.xml
@@ -23,47 +23,30 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>flink-table-store-parent</artifactId>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-spark</artifactId>
<version>0.4-SNAPSHOT</version>
</parent>
- <artifactId>flink-table-store-spark2</artifactId>
- <name>Flink Table Store : Spark2</name>
-
- <packaging>jar</packaging>
+ <artifactId>flink-table-store-spark-3.3</artifactId>
+ <name>Flink Table Store : Spark : 3.3</name>
<properties>
- <spark2.version>2.4.8</spark2.version>
- <jackson.version>2.13.3</jackson.version>
+ <spark.version>3.3.0</spark.version>
</properties>
<dependencies>
- <!-- Flink All dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-shade</artifactId>
+ <artifactId>flink-table-store-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>${spark2.version}</version>
+ <artifactId>spark-sql_2.12</artifactId>
+ <version>${spark.version}</version>
<exclusions>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <!-- The Jackson dependencies of Spark 2.4.8 have
vulnerabilities. -->
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>*</artifactId>
- </exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
@@ -74,13 +57,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
- <version>${jackson.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
@@ -98,7 +74,7 @@ under the License.
<configuration>
<artifactSet>
<includes combine.children="append">
-
<include>org.apache.flink:flink-table-store-shade</include>
+
<include>org.apache.flink:flink-table-store-spark-common</include>
</includes>
</artifactSet>
</configuration>
@@ -107,4 +83,4 @@ under the License.
</plugin>
</plugins>
</build>
-</project>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-spark2/pom.xml
b/flink-table-store-spark/flink-table-store-spark-common/pom.xml
similarity index 66%
rename from flink-table-store-spark2/pom.xml
rename to flink-table-store-spark/flink-table-store-spark-common/pom.xml
index e9d129d4..1303a81c 100644
--- a/flink-table-store-spark2/pom.xml
+++ b/flink-table-store-spark/flink-table-store-spark-common/pom.xml
@@ -23,47 +23,26 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>flink-table-store-parent</artifactId>
+ <artifactId>flink-table-store-spark</artifactId>
<groupId>org.apache.flink</groupId>
<version>0.4-SNAPSHOT</version>
</parent>
- <artifactId>flink-table-store-spark2</artifactId>
- <name>Flink Table Store : Spark2</name>
-
<packaging>jar</packaging>
+ <artifactId>flink-table-store-spark-common</artifactId>
+ <name>Flink Table Store : Spark : Common</name>
+
<properties>
- <spark2.version>2.4.8</spark2.version>
- <jackson.version>2.13.3</jackson.version>
+ <spark.version>3.2.2</spark.version>
</properties>
<dependencies>
- <!-- Flink All dependencies -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-shade</artifactId>
- <version>${project.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>${spark2.version}</version>
+ <artifactId>spark-sql_2.12</artifactId>
+ <version>${spark.version}</version>
<exclusions>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <!-- The Jackson dependencies of Spark 2.4.8 have
vulnerabilities. -->
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>*</artifactId>
- </exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
@@ -72,15 +51,12 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
</exclusions>
</dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
- <version>${jackson.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
similarity index 99%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 42986585..86a3b343 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -224,7 +224,7 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces {
}
@Override
- public Table createTable(
+ public SparkTable createTable(
Identifier ident,
StructType schema,
Transform[] partitions,
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
similarity index 98%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
index c6480e1d..d3da03b2 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
+++
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.store.types.MapType;
import org.apache.flink.table.store.types.MultisetType;
import org.apache.flink.table.store.types.RowType;
-import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -167,7 +166,7 @@ public class SparkInternalRow extends
org.apache.spark.sql.catalyst.InternalRow
@Override
public Object get(int ordinal, org.apache.spark.sql.types.DataType
dataType) {
- return SpecializedGettersReader.read(this, ordinal, dataType, true,
true);
+ return SpecializedGettersReader.read(this, ordinal, dataType);
}
public static Object fromFlink(Object o, DataType type) {
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkRow.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkRow.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkRow.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkRow.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
similarity index 100%
rename from
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
similarity index 95%
rename from
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
index 5eaf1738..7ed3633a 100644
---
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
+++
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -39,7 +39,10 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.types.UserDefinedType;
-/** Reader of Spark {@link SpecializedGetters}. */
+/**
+ * Reader of Spark {@link SpecializedGetters}. Copied from Spark to avoid
conflict between Spark2
+ * and Spark3 .
+ */
public final class SpecializedGettersReader {
private SpecializedGettersReader() {}
diff --git
a/flink-table-store-spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/flink-table-store-spark/flink-table-store-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
similarity index 100%
rename from
flink-table-store-spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
rename to
flink-table-store-spark/flink-table-store-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
similarity index 100%
rename from
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
diff --git
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
similarity index 100%
rename from
flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
similarity index 100%
rename from
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
similarity index 100%
rename from
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
similarity index 91%
rename from
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
index de365c31..fb6dc4f5 100644
---
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
+++
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
@@ -30,7 +30,6 @@ import
org.apache.flink.table.store.table.sink.StreamTableWrite;
import org.apache.flink.table.store.types.DataField;
import org.apache.flink.table.store.types.RowKind;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -39,10 +38,8 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -53,8 +50,6 @@ public abstract class SparkReadTestBase {
private static final String COMMIT_USER = "user";
private static final AtomicLong COMMIT_IDENTIFIER = new AtomicLong(0);
- private static File warehouse = null;
-
protected static SparkSession spark = null;
protected static Path warehousePath = null;
@@ -64,10 +59,8 @@ public abstract class SparkReadTestBase {
protected static Path tablePath2;
@BeforeAll
- public static void startMetastoreAndSpark() throws Exception {
- warehouse = Files.createTempFile("warehouse", null).toFile();
- assertThat(warehouse.delete()).isTrue();
- warehousePath = new Path("file:" + warehouse);
+ public static void startMetastoreAndSpark(@TempDir java.nio.file.Path
tempDir) {
+ warehousePath = new Path("file:" + tempDir.toString());
spark = SparkSession.builder().master("local[2]").getOrCreate();
spark.conf().set("spark.sql.catalog.tablestore",
SparkCatalog.class.getName());
spark.conf().set("spark.sql.catalog.tablestore.warehouse",
warehousePath.toString());
@@ -76,10 +69,7 @@ public abstract class SparkReadTestBase {
}
@AfterAll
- public static void stopMetastoreAndSpark() throws IOException {
- if (warehouse != null && warehouse.exists()) {
- FileUtils.deleteDirectory(warehouse);
- }
+ public static void stopMetastoreAndSpark() {
if (spark != null) {
spark.stop();
spark = null;
@@ -124,9 +114,7 @@ public abstract class SparkReadTestBase {
public void afterEach() {
List<Row> tables = spark.sql("show tables").collectAsList();
tables.forEach(
- table -> {
- spark.sql("DROP TABLE " + table.getString(0) + "." +
table.getString(1));
- });
+ table -> spark.sql("DROP TABLE " + table.getString(0) + "." +
table.getString(1)));
}
protected void innerTestSimpleType(Dataset<Row> dataset) {
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
similarity index 94%
rename from
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
index a8f06b80..313cba74 100644
---
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
+++
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
@@ -24,6 +24,7 @@ import
org.apache.flink.table.store.testutils.junit.parameterized.Parameters;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
@@ -60,6 +61,14 @@ public class SparkS3ITCase {
spark.sql("USE tablestore.db");
}
+ @AfterAll
+ public static void stopMetastoreAndSpark() {
+ if (spark != null) {
+ spark.stop();
+ spark = null;
+ }
+ }
+
@Parameters(name = "{0}")
public static Collection<String> parameters() {
return Arrays.asList("avro", "orc", "parquet");
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
similarity index 100%
rename from
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
similarity index 100%
rename from
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
similarity index 90%
rename from
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
rename to
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
index 045e0306..0307841a 100644
---
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
+++
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
@@ -22,11 +22,12 @@ import org.apache.flink.table.store.fs.Path;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
-import java.io.File;
import java.util.Comparator;
import java.util.List;
@@ -38,10 +39,8 @@ public class SparkWriteITCase {
private static SparkSession spark = null;
@BeforeAll
- public static void startMetastoreAndSpark() throws Exception {
- File warehouse = File.createTempFile("warehouse", null);
- assertThat(warehouse.delete()).isTrue();
- Path warehousePath = new Path("file:" + warehouse);
+ public static void startMetastoreAndSpark(@TempDir java.nio.file.Path
tempDir) {
+ Path warehousePath = new Path("file:" + tempDir.toString());
spark = SparkSession.builder().master("local[2]").getOrCreate();
spark.conf().set("spark.sql.catalog.tablestore",
SparkCatalog.class.getName());
spark.conf().set("spark.sql.catalog.tablestore.warehouse",
warehousePath.toString());
@@ -49,6 +48,14 @@ public class SparkWriteITCase {
spark.sql("USE tablestore.db");
}
+ @AfterAll
+ public static void stopMetastoreAndSpark() {
+ if (spark != null) {
+ spark.stop();
+ spark = null;
+ }
+ }
+
@AfterEach
public void afterEach() {
spark.sql("DROP TABLE T");
diff --git a/flink-table-store-spark2/src/test/resources/log4j2-test.properties
b/flink-table-store-spark/flink-table-store-spark-common/src/test/resources/log4j2-test.properties
similarity index 100%
rename from flink-table-store-spark2/src/test/resources/log4j2-test.properties
rename to
flink-table-store-spark/flink-table-store-spark-common/src/test/resources/log4j2-test.properties
diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml
index 53e13d27..bd00d1b9 100644
--- a/flink-table-store-spark/pom.xml
+++ b/flink-table-store-spark/pom.xml
@@ -31,37 +31,29 @@ under the License.
<artifactId>flink-table-store-spark</artifactId>
<name>Flink Table Store : Spark</name>
- <packaging>jar</packaging>
+ <packaging>pom</packaging>
<properties>
- <spark.version>3.2.2</spark.version>
<aws.version>1.12.319</aws.version>
</properties>
+ <modules>
+ <module>flink-table-store-spark-common</module>
+ <module>flink-table-store-spark-2</module>
+ <module>flink-table-store-spark-3.3</module>
+ <module>flink-table-store-spark-3.1</module>
+ <module>flink-table-store-spark-3.2</module>
+ </modules>
+
<dependencies>
<!-- Flink All dependencies -->
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-store-shade</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.12</artifactId>
- <version>${spark.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<!-- Test -->
<dependency>
@@ -92,70 +84,4 @@ under the License.
<scope>test</scope>
</dependency>
</dependencies>
-
- <!-- Activate these profiles with -Pspark-x.x to build and test against
different Spark versions -->
- <profiles>
- <profile>
- <id>spark-3.3</id>
- <properties>
- <spark.version>3.3.0</spark.version>
- </properties>
- </profile>
- <profile>
- <id>spark-3.2</id>
- <properties>
- <spark.version>3.2.2</spark.version>
- </properties>
- </profile>
- <profile>
- <id>spark-3.1</id>
- <properties>
- <spark.version>3.1.3</spark.version>
- </properties>
- </profile>
- </profiles>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>add-sources</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/${spark.version}</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>shade-flink</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <includes combine.children="append">
-
<include>org.apache.flink:flink-table-store-shade</include>
- </includes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
</project>
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
deleted file mode 100644
index eab56117..00000000
---
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.spark;
-
-import org.apache.flink.table.store.data.InternalArray;
-import org.apache.flink.table.store.types.ArrayType;
-import org.apache.flink.table.store.types.BigIntType;
-import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.table.store.utils.RowDataUtils;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-import static org.apache.flink.table.store.spark.SparkInternalRow.fromFlink;
-import static org.apache.flink.table.store.utils.RowDataUtils.copyArray;
-import static org.apache.flink.table.store.utils.TypeUtils.timestampPrecision;
-
-/** Spark {@link ArrayData} to wrap Flink {@code ArrayData}. */
-public class SparkArrayData extends ArrayData {
-
- private final org.apache.flink.table.store.types.DataType elementType;
-
- private InternalArray array;
-
- public SparkArrayData(org.apache.flink.table.store.types.DataType
elementType) {
- this.elementType = elementType;
- }
-
- public SparkArrayData replace(InternalArray array) {
- this.array = array;
- return this;
- }
-
- @Override
- public int numElements() {
- return array.size();
- }
-
- @Override
- public ArrayData copy() {
- return new SparkArrayData(elementType).replace(copyArray(array,
elementType));
- }
-
- @Override
- public Object[] array() {
- Object[] objects = new Object[numElements()];
- for (int i = 0; i < objects.length; i++) {
- objects[i] = fromFlink(RowDataUtils.get(array, i, elementType),
elementType);
- }
- return objects;
- }
-
- @Override
- public void setNullAt(int i) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void update(int i, Object value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isNullAt(int ordinal) {
- return array.isNullAt(ordinal);
- }
-
- @Override
- public boolean getBoolean(int ordinal) {
- return array.getBoolean(ordinal);
- }
-
- @Override
- public byte getByte(int ordinal) {
- return array.getByte(ordinal);
- }
-
- @Override
- public short getShort(int ordinal) {
- return array.getShort(ordinal);
- }
-
- @Override
- public int getInt(int ordinal) {
- return array.getInt(ordinal);
- }
-
- @Override
- public long getLong(int ordinal) {
- if (elementType instanceof BigIntType) {
- return array.getLong(ordinal);
- }
-
- return getTimestampMicros(ordinal);
- }
-
- private long getTimestampMicros(int ordinal) {
- return fromFlink(array.getTimestamp(ordinal,
timestampPrecision(elementType)));
- }
-
- @Override
- public float getFloat(int ordinal) {
- return array.getFloat(ordinal);
- }
-
- @Override
- public double getDouble(int ordinal) {
- return array.getDouble(ordinal);
- }
-
- @Override
- public Decimal getDecimal(int ordinal, int precision, int scale) {
- return fromFlink(array.getDecimal(ordinal, precision, scale));
- }
-
- @Override
- public UTF8String getUTF8String(int ordinal) {
- return fromFlink(array.getString(ordinal));
- }
-
- @Override
- public byte[] getBinary(int ordinal) {
- return array.getBinary(ordinal);
- }
-
- @Override
- public CalendarInterval getInterval(int ordinal) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public InternalRow getStruct(int ordinal, int numFields) {
- return fromFlink(array.getRow(ordinal, numFields), (RowType)
elementType);
- }
-
- @Override
- public ArrayData getArray(int ordinal) {
- return fromFlink(array.getArray(ordinal), (ArrayType) elementType);
- }
-
- @Override
- public MapData getMap(int ordinal) {
- return fromFlink(array.getMap(ordinal), elementType);
- }
-
- @Override
- public Object get(int ordinal, DataType dataType) {
- return SpecializedGettersReader.read(this, ordinal, dataType);
- }
-}
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
deleted file mode 100644
index cb12bcd4..00000000
---
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.spark;
-
-import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.predicate.PredicateBuilder;
-import org.apache.flink.table.store.types.DataType;
-import org.apache.flink.table.store.types.RowType;
-
-import org.apache.spark.sql.sources.And;
-import org.apache.spark.sql.sources.EqualTo;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.GreaterThan;
-import org.apache.spark.sql.sources.GreaterThanOrEqual;
-import org.apache.spark.sql.sources.In;
-import org.apache.spark.sql.sources.IsNotNull;
-import org.apache.spark.sql.sources.IsNull;
-import org.apache.spark.sql.sources.LessThan;
-import org.apache.spark.sql.sources.LessThanOrEqual;
-import org.apache.spark.sql.sources.Not;
-import org.apache.spark.sql.sources.Or;
-import org.apache.spark.sql.sources.StringStartsWith;
-
-import java.util.Arrays;
-import java.util.stream.Collectors;
-
-import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.convertJavaObject;
-
-/** Conversion from {@link Filter} to {@link Predicate}. */
-public class SparkFilterConverter {
-
- private final RowType rowType;
- private final PredicateBuilder builder;
-
- public SparkFilterConverter(RowType rowType) {
- this.rowType = rowType;
- this.builder = new PredicateBuilder(rowType);
- }
-
- public Predicate convert(Filter filter) {
- if (filter instanceof EqualTo) {
- EqualTo eq = (EqualTo) filter;
- // TODO deal with isNaN
- int index = fieldIndex(eq.attribute());
- Object literal = convertLiteral(index, eq.value());
- return builder.equal(index, literal);
- } else if (filter instanceof GreaterThan) {
- GreaterThan gt = (GreaterThan) filter;
- int index = fieldIndex(gt.attribute());
- Object literal = convertLiteral(index, gt.value());
- return builder.greaterThan(index, literal);
- } else if (filter instanceof GreaterThanOrEqual) {
- GreaterThanOrEqual gt = (GreaterThanOrEqual) filter;
- int index = fieldIndex(gt.attribute());
- Object literal = convertLiteral(index, gt.value());
- return builder.greaterOrEqual(index, literal);
- } else if (filter instanceof LessThan) {
- LessThan lt = (LessThan) filter;
- int index = fieldIndex(lt.attribute());
- Object literal = convertLiteral(index, lt.value());
- return builder.lessThan(index, literal);
- } else if (filter instanceof LessThanOrEqual) {
- LessThanOrEqual lt = (LessThanOrEqual) filter;
- int index = fieldIndex(lt.attribute());
- Object literal = convertLiteral(index, lt.value());
- return builder.lessOrEqual(index, literal);
- } else if (filter instanceof In) {
- In in = (In) filter;
- int index = fieldIndex(in.attribute());
- return builder.in(
- index,
- Arrays.stream(in.values())
- .map(v -> convertLiteral(index, v))
- .collect(Collectors.toList()));
- } else if (filter instanceof IsNull) {
- return builder.isNull(fieldIndex(((IsNull) filter).attribute()));
- } else if (filter instanceof IsNotNull) {
- return builder.isNotNull(fieldIndex(((IsNotNull)
filter).attribute()));
- } else if (filter instanceof And) {
- And and = (And) filter;
- return PredicateBuilder.and(convert(and.left()),
convert(and.right()));
- } else if (filter instanceof Or) {
- Or or = (Or) filter;
- return PredicateBuilder.or(convert(or.left()),
convert(or.right()));
- } else if (filter instanceof Not) {
- Not not = (Not) filter;
- return
convert(not.child()).negate().orElseThrow(UnsupportedOperationException::new);
- } else if (filter instanceof StringStartsWith) {
- StringStartsWith startsWith = (StringStartsWith) filter;
- int index = fieldIndex(startsWith.attribute());
- Object literal = convertLiteral(index, startsWith.value());
- return builder.startsWith(index, literal);
- }
-
- // TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe
- throw new UnsupportedOperationException();
- }
-
- private int fieldIndex(String field) {
- int index = rowType.getFieldIndex(field);
- // TODO: support nested field
- if (index == -1) {
- throw new UnsupportedOperationException();
- }
- return index;
- }
-
- private Object convertLiteral(int index, Object value) {
- DataType type = rowType.getTypeAt(index);
- return convertJavaObject(type, value);
- }
-}
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
deleted file mode 100644
index cac523c1..00000000
---
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.spark;
-
-import org.apache.flink.table.store.data.BinaryString;
-import org.apache.flink.table.store.data.InternalArray;
-import org.apache.flink.table.store.data.InternalMap;
-import org.apache.flink.table.store.data.InternalRow;
-import org.apache.flink.table.store.data.Timestamp;
-import org.apache.flink.table.store.types.ArrayType;
-import org.apache.flink.table.store.types.BigIntType;
-import org.apache.flink.table.store.types.IntType;
-import org.apache.flink.table.store.types.MapType;
-import org.apache.flink.table.store.types.MultisetType;
-import org.apache.flink.table.store.types.RowType;
-
-import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-import static org.apache.flink.table.store.utils.RowDataUtils.copyRowData;
-import static org.apache.flink.table.store.utils.TypeUtils.timestampPrecision;
-
-/** Spark {@link org.apache.spark.sql.catalyst.InternalRow} to wrap {@link
InternalRow}. */
-public class SparkInternalRow extends
org.apache.spark.sql.catalyst.InternalRow {
-
- private final RowType rowType;
-
- private InternalRow row;
-
- public SparkInternalRow(RowType rowType) {
- this.rowType = rowType;
- }
-
- public SparkInternalRow replace(InternalRow row) {
- this.row = row;
- return this;
- }
-
- @Override
- public int numFields() {
- return row.getFieldCount();
- }
-
- @Override
- public void setNullAt(int i) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void update(int i, Object value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public org.apache.spark.sql.catalyst.InternalRow copy() {
- return new SparkInternalRow(rowType).replace(copyRowData(row,
rowType));
- }
-
- @Override
- public boolean isNullAt(int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- @Override
- public boolean getBoolean(int ordinal) {
- return row.getBoolean(ordinal);
- }
-
- @Override
- public byte getByte(int ordinal) {
- return row.getByte(ordinal);
- }
-
- @Override
- public short getShort(int ordinal) {
- return row.getShort(ordinal);
- }
-
- @Override
- public int getInt(int ordinal) {
- return row.getInt(ordinal);
- }
-
- @Override
- public long getLong(int ordinal) {
- if (rowType.getTypeAt(ordinal) instanceof BigIntType) {
- return row.getLong(ordinal);
- }
-
- return getTimestampMicros(ordinal);
- }
-
- private long getTimestampMicros(int ordinal) {
- org.apache.flink.table.store.types.DataType type =
rowType.getTypeAt(ordinal);
- return fromFlink(row.getTimestamp(ordinal, timestampPrecision(type)));
- }
-
- @Override
- public float getFloat(int ordinal) {
- return row.getFloat(ordinal);
- }
-
- @Override
- public double getDouble(int ordinal) {
- return row.getDouble(ordinal);
- }
-
- @Override
- public Decimal getDecimal(int ordinal, int precision, int scale) {
- org.apache.flink.table.store.data.Decimal decimal =
- row.getDecimal(ordinal, precision, scale);
- return fromFlink(decimal);
- }
-
- @Override
- public UTF8String getUTF8String(int ordinal) {
- return fromFlink(row.getString(ordinal));
- }
-
- @Override
- public byte[] getBinary(int ordinal) {
- return row.getBinary(ordinal);
- }
-
- @Override
- public CalendarInterval getInterval(int ordinal) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal,
int numFields) {
- return fromFlink(row.getRow(ordinal, numFields), (RowType)
rowType.getTypeAt(ordinal));
- }
-
- @Override
- public ArrayData getArray(int ordinal) {
- return fromFlink(row.getArray(ordinal), (ArrayType)
rowType.getTypeAt(ordinal));
- }
-
- @Override
- public MapData getMap(int ordinal) {
- return fromFlink(row.getMap(ordinal), rowType.getTypeAt(ordinal));
- }
-
- @Override
- public Object get(int ordinal, DataType dataType) {
- return SpecializedGettersReader.read(this, ordinal, dataType);
- }
-
- public static Object fromFlink(Object o,
org.apache.flink.table.store.types.DataType type) {
- if (o == null) {
- return null;
- }
- switch (type.getTypeRoot()) {
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return fromFlink((Timestamp) o);
- case CHAR:
- case VARCHAR:
- return fromFlink((BinaryString) o);
- case DECIMAL:
- return fromFlink((org.apache.flink.table.store.data.Decimal)
o);
- case ARRAY:
- return fromFlink((InternalArray) o, (ArrayType) type);
- case MAP:
- case MULTISET:
- return fromFlink((InternalMap) o, type);
- case ROW:
- return fromFlink((InternalRow) o, (RowType) type);
- default:
- return o;
- }
- }
-
- public static UTF8String fromFlink(BinaryString string) {
- return UTF8String.fromBytes(string.toBytes());
- }
-
- public static Decimal fromFlink(org.apache.flink.table.store.data.Decimal
decimal) {
- return Decimal.apply(decimal.toBigDecimal());
- }
-
- public static org.apache.spark.sql.catalyst.InternalRow fromFlink(
- InternalRow row, RowType rowType) {
- return new SparkInternalRow(rowType).replace(row);
- }
-
- public static long fromFlink(Timestamp timestamp) {
- return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp());
- }
-
- public static ArrayData fromFlink(InternalArray array, ArrayType
arrayType) {
- return fromFlinkArrayElementType(array, arrayType.getElementType());
- }
-
- private static ArrayData fromFlinkArrayElementType(
- InternalArray array, org.apache.flink.table.store.types.DataType
elementType) {
- return new SparkArrayData(elementType).replace(array);
- }
-
- public static MapData fromFlink(
- InternalMap map, org.apache.flink.table.store.types.DataType
mapType) {
- org.apache.flink.table.store.types.DataType keyType;
- org.apache.flink.table.store.types.DataType valueType;
- if (mapType instanceof MapType) {
- keyType = ((MapType) mapType).getKeyType();
- valueType = ((MapType) mapType).getValueType();
- } else if (mapType instanceof MultisetType) {
- keyType = ((MultisetType) mapType).getElementType();
- valueType = new IntType();
- } else {
- throw new UnsupportedOperationException("Unsupported type: " +
mapType);
- }
-
- return new ArrayBasedMapData(
- fromFlinkArrayElementType(map.keyArray(), keyType),
- fromFlinkArrayElementType(map.valueArray(), valueType));
- }
-}
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
deleted file mode 100644
index eaffc44d..00000000
---
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.spark;
-
-import org.apache.flink.table.store.types.ArrayType;
-import org.apache.flink.table.store.types.BigIntType;
-import org.apache.flink.table.store.types.BinaryType;
-import org.apache.flink.table.store.types.BooleanType;
-import org.apache.flink.table.store.types.CharType;
-import org.apache.flink.table.store.types.DataField;
-import org.apache.flink.table.store.types.DataTypeDefaultVisitor;
-import org.apache.flink.table.store.types.DateType;
-import org.apache.flink.table.store.types.DecimalType;
-import org.apache.flink.table.store.types.DoubleType;
-import org.apache.flink.table.store.types.FloatType;
-import org.apache.flink.table.store.types.IntType;
-import org.apache.flink.table.store.types.LocalZonedTimestampType;
-import org.apache.flink.table.store.types.MapType;
-import org.apache.flink.table.store.types.MultisetType;
-import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.table.store.types.SmallIntType;
-import org.apache.flink.table.store.types.TimestampType;
-import org.apache.flink.table.store.types.TinyIntType;
-import org.apache.flink.table.store.types.VarBinaryType;
-import org.apache.flink.table.store.types.VarCharType;
-
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-/** Utils for Spark {@link DataType}. */
-public class SparkTypeUtils {
-
- private SparkTypeUtils() {}
-
- public static StructType fromFlinkRowType(RowType type) {
- return (StructType) fromFlinkType(type);
- }
-
- public static DataType
fromFlinkType(org.apache.flink.table.store.types.DataType type) {
- return type.accept(FlinkToSparkTypeVisitor.INSTANCE);
- }
-
- private static class FlinkToSparkTypeVisitor extends
DataTypeDefaultVisitor<DataType> {
-
- private static final FlinkToSparkTypeVisitor INSTANCE = new
FlinkToSparkTypeVisitor();
-
- @Override
- public DataType visit(CharType charType) {
- return DataTypes.StringType;
- }
-
- @Override
- public DataType visit(VarCharType varCharType) {
- return DataTypes.StringType;
- }
-
- @Override
- public DataType visit(BooleanType booleanType) {
- return DataTypes.BooleanType;
- }
-
- @Override
- public DataType visit(BinaryType binaryType) {
- return DataTypes.BinaryType;
- }
-
- @Override
- public DataType visit(VarBinaryType varBinaryType) {
- return DataTypes.BinaryType;
- }
-
- @Override
- public DataType visit(DecimalType decimalType) {
- return DataTypes.createDecimalType(decimalType.getPrecision(),
decimalType.getScale());
- }
-
- @Override
- public DataType visit(TinyIntType tinyIntType) {
- return DataTypes.ByteType;
- }
-
- @Override
- public DataType visit(SmallIntType smallIntType) {
- return DataTypes.ShortType;
- }
-
- @Override
- public DataType visit(IntType intType) {
- return DataTypes.IntegerType;
- }
-
- @Override
- public DataType visit(BigIntType bigIntType) {
- return DataTypes.LongType;
- }
-
- @Override
- public DataType visit(FloatType floatType) {
- return DataTypes.FloatType;
- }
-
- @Override
- public DataType visit(DoubleType doubleType) {
- return DataTypes.DoubleType;
- }
-
- @Override
- public DataType visit(DateType dateType) {
- return DataTypes.DateType;
- }
-
- @Override
- public DataType visit(TimestampType timestampType) {
- return DataTypes.TimestampType;
- }
-
- @Override
- public DataType visit(LocalZonedTimestampType localZonedTimestampType)
{
- return DataTypes.TimestampType;
- }
-
- @Override
- public DataType visit(ArrayType arrayType) {
- org.apache.flink.table.store.types.DataType elementType =
arrayType.getElementType();
- return DataTypes.createArrayType(elementType.accept(this),
elementType.isNullable());
- }
-
- @Override
- public DataType visit(MultisetType multisetType) {
- return DataTypes.createMapType(
- multisetType.getElementType().accept(this),
DataTypes.IntegerType, false);
- }
-
- @Override
- public DataType visit(MapType mapType) {
- return DataTypes.createMapType(
- mapType.getKeyType().accept(this),
- mapType.getValueType().accept(this),
- mapType.getValueType().isNullable());
- }
-
- @Override
- public DataType visit(RowType rowType) {
- List<StructField> fields = new
ArrayList<>(rowType.getFieldCount());
- for (DataField field : rowType.getFields()) {
- StructField structField =
- DataTypes.createStructField(
- field.name(), field.type().accept(this),
field.type().isNullable());
- structField =
- Optional.ofNullable(field.description())
- .map(structField::withComment)
- .orElse(structField);
- fields.add(structField);
- }
- return DataTypes.createStructType(fields);
- }
-
- @Override
- protected DataType
defaultMethod(org.apache.flink.table.store.types.DataType logicalType) {
- throw new UnsupportedOperationException("Unsupported type: " +
logicalType);
- }
- }
-}
diff --git a/pom.xml b/pom.xml
index 94ac4b2c..e9dd516e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,7 +65,6 @@ under the License.
<module>flink-table-store-shade</module>
<module>flink-table-store-hive</module>
<module>flink-table-store-spark</module>
- <module>flink-table-store-spark2</module>
<module>flink-table-store-test-utils</module>
</modules>