This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 77fd8ecf [FLINK-31302] Split spark modules according to version
77fd8ecf is described below

commit 77fd8ecfd599c9d6c2b706bee4684558a8ba2f40
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 3 15:45:58 2023 +0800

    [FLINK-31302] Split spark modules according to version
    
    This closes #570
---
 docs/content/docs/engines/spark2.md                |   2 +-
 docs/content/docs/engines/spark3.md                |   4 +-
 .../table/store/file/schema/SchemaManagerTest.java |  16 +-
 flink-table-store-e2e-tests/pom.xml                |   4 +-
 .../flink/table/store/tests/SparkE2eTest.java      |   6 +
 .../flink-table-store-spark-2}/pom.xml             |  21 +-
 .../table/store/spark/SparkDataSourceReader.java   |   0
 .../table/store/spark/SparkInputPartition.java     |   0
 .../flink/table/store/spark/SparkSource.java       |   0
 ...org.apache.spark.sql.sources.DataSourceRegister |   0
 .../table/store/spark/SimpleTableTestHelper.java   |   0
 .../store/spark/SparkFilterConverterTest.java      |   0
 .../table/store/spark/SparkInternalRowTest.java    |   0
 .../flink/table/store/spark/SparkReadITCase.java   |   0
 .../flink/table/store/spark/SparkTypeTest.java     |   0
 .../src/test/resources/log4j2-test.properties      |   0
 .../flink-table-store-spark-3.1}/pom.xml           |  42 +---
 .../flink-table-store-spark-3.2}/pom.xml           |  42 +---
 .../flink-table-store-spark-3.3}/pom.xml           |  42 +---
 .../flink-table-store-spark-common}/pom.xml        |  46 +---
 .../flink/table/store/spark/SparkArrayData.java    |   0
 .../flink/table/store/spark/SparkCatalog.java      |   2 +-
 .../table/store/spark/SparkFilterConverter.java    |   0
 .../table/store/spark/SparkInputPartition.java     |   0
 .../flink/table/store/spark/SparkInternalRow.java  |   3 +-
 .../table/store/spark/SparkReaderFactory.java      |   0
 .../apache/flink/table/store/spark/SparkRow.java   |   0
 .../apache/flink/table/store/spark/SparkScan.java  |   0
 .../flink/table/store/spark/SparkScanBuilder.java  |   0
 .../flink/table/store/spark/SparkSource.java       |   0
 .../apache/flink/table/store/spark/SparkTable.java |   0
 .../flink/table/store/spark/SparkTypeUtils.java    |   0
 .../apache/flink/table/store/spark/SparkWrite.java |   0
 .../flink/table/store/spark/SparkWriteBuilder.java |   0
 .../store/spark/SpecializedGettersReader.java      |   7 +-
 ...org.apache.spark.sql.sources.DataSourceRegister |   0
 .../table/store/spark/MinioTestContainer.java      |   0
 .../store/spark/SparkFilterConverterTest.java      |   0
 .../table/store/spark/SparkInternalRowTest.java    |   0
 .../flink/table/store/spark/SparkReadITCase.java   |   0
 .../flink/table/store/spark/SparkReadTestBase.java |  22 +-
 .../flink/table/store/spark/SparkS3ITCase.java     |   9 +
 .../store/spark/SparkSchemaEvolutionITCase.java    |   0
 .../flink/table/store/spark/SparkTypeTest.java     |   0
 .../flink/table/store/spark/SparkWriteITCase.java  |  17 +-
 .../src/test/resources/log4j2-test.properties      |   0
 flink-table-store-spark/pom.xml                    |  94 +-------
 .../flink/table/store/spark/SparkArrayData.java    | 171 ---------------
 .../table/store/spark/SparkFilterConverter.java    | 128 -----------
 .../flink/table/store/spark/SparkInternalRow.java  | 241 ---------------------
 .../flink/table/store/spark/SparkTypeUtils.java    | 185 ----------------
 pom.xml                                            |   1 -
 52 files changed, 114 insertions(+), 991 deletions(-)

diff --git a/docs/content/docs/engines/spark2.md 
b/docs/content/docs/engines/spark2.md
index bce84516..318fbbbd 100644
--- a/docs/content/docs/engines/spark2.md
+++ b/docs/content/docs/engines/spark2.md
@@ -56,7 +56,7 @@ Build bundled jar with the following command.
 mvn clean install -DskipTests
 ```
 
-You can find the bundled jar in 
`./flink-table-store-spark2/target/flink-table-store-spark2-{{< version 
>}}.jar`.
+You can find the bundled jar in 
`./flink-table-store-spark/flink-table-store-spark2/target/flink-table-store-spark2-{{<
 version >}}.jar`.
 
 ## Quick Start
 
diff --git a/docs/content/docs/engines/spark3.md 
b/docs/content/docs/engines/spark3.md
index ce5fa814..a44263bd 100644
--- a/docs/content/docs/engines/spark3.md
+++ b/docs/content/docs/engines/spark3.md
@@ -32,6 +32,8 @@ This documentation is a guide for using Table Store in Spark3.
 
 {{< stable >}}
 
+Table Store currently supports Spark 3.3, 3.2 and 3.1. We recommend the latest 
Spark version for a better experience.
+
 Download [flink-table-store-spark-{{< version 
>}}.jar](https://www.apache.org/dyn/closer.lua/flink/flink-table-store-{{< 
version >}}/flink-table-store-spark-{{< version >}}.jar).
 
 You can also manually build bundled jar from the source code.
@@ -52,7 +54,7 @@ Build bundled jar with the following command.
 mvn clean install -DskipTests
 ```
 
-You can find the bundled jar in 
`./flink-table-store-spark/target/flink-table-store-spark-{{< version >}}.jar`.
+For Spark 3.3, you can find the bundled jar in 
`./flink-table-store-spark/flink-table-store-spark-3.3/target/flink-table-store-spark-3.3-{{<
 version >}}.jar`.
 
 ## Quick Start
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index e66bea70..6da278f5 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -147,13 +147,15 @@ public class SchemaManagerTest {
 
     @Test
     public void testConcurrentCommit() throws Exception {
-        manager.createTable(
-                new Schema(
-                        rowType.getFields(),
-                        partitionKeys,
-                        primaryKeys,
-                        Collections.singletonMap("id", "-1"),
-                        "my_comment_4"));
+        retryArtificialException(
+                () ->
+                        manager.createTable(
+                                new Schema(
+                                        rowType.getFields(),
+                                        partitionKeys,
+                                        primaryKeys,
+                                        Collections.singletonMap("id", "-1"),
+                                        "my_comment_4")));
 
         int threadNumber = ThreadLocalRandom.current().nextInt(3) + 2;
         List<Thread> threads = new ArrayList<>();
diff --git a/flink-table-store-e2e-tests/pom.xml 
b/flink-table-store-e2e-tests/pom.xml
index 1580f006..112b4fda 100644
--- a/flink-table-store-e2e-tests/pom.xml
+++ b/flink-table-store-e2e-tests/pom.xml
@@ -62,7 +62,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-spark</artifactId>
+            <artifactId>flink-table-store-spark-3.2</artifactId>
             <version>${project.version}</version>
             <scope>runtime</scope>
         </dependency>
@@ -132,7 +132,7 @@ under the License.
                         </artifactItem>
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
-                            <artifactId>flink-table-store-spark</artifactId>
+                            
<artifactId>flink-table-store-spark-3.2</artifactId>
                             <version>${project.version}</version>
                             
<destFileName>flink-table-store-spark.jar</destFileName>
                             <type>jar</type>
diff --git 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/SparkE2eTest.java
 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/SparkE2eTest.java
index f372d25b..3761b837 100644
--- 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/SparkE2eTest.java
+++ 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/SparkE2eTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.tests;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.ContainerState;
 
@@ -31,6 +33,8 @@ import java.util.stream.Collectors;
 @DisabledIfSystemProperty(named = "test.flink.version", matches = "1.14.*")
 public class SparkE2eTest extends E2eReaderTestBase {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(SparkE2eTest.class);
+
     public SparkE2eTest() {
         super(false, false, true);
     }
@@ -63,6 +67,8 @@ public class SparkE2eTest extends E2eReaderTestBase {
                                             "-f",
                                             TEST_DATA_DIR + "/" + sql);
                     if (execResult.getExitCode() != 0) {
+                        LOG.info(execResult.getStdout());
+                        LOG.info(execResult.getStderr());
                         throw new AssertionError("Failed when running spark 
sql.");
                     }
                     return Arrays.stream(execResult.getStdout().split("\n"))
diff --git a/flink-table-store-spark2/pom.xml 
b/flink-table-store-spark/flink-table-store-spark-2/pom.xml
similarity index 88%
copy from flink-table-store-spark2/pom.xml
copy to flink-table-store-spark/flink-table-store-spark-2/pom.xml
index e9d129d4..7207d74f 100644
--- a/flink-table-store-spark2/pom.xml
+++ b/flink-table-store-spark/flink-table-store-spark-2/pom.xml
@@ -23,15 +23,13 @@ under the License.
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <artifactId>flink-table-store-parent</artifactId>
         <groupId>org.apache.flink</groupId>
+        <artifactId>flink-table-store-spark</artifactId>
         <version>0.4-SNAPSHOT</version>
     </parent>
 
-    <artifactId>flink-table-store-spark2</artifactId>
-    <name>Flink Table Store : Spark2</name>
-
-    <packaging>jar</packaging>
+    <artifactId>flink-table-store-spark-2</artifactId>
+    <name>Flink Table Store : Spark : 2</name>
 
     <properties>
         <spark2.version>2.4.8</spark2.version>
@@ -39,11 +37,16 @@ under the License.
     </properties>
 
     <dependencies>
-        <!-- Flink All dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-shade</artifactId>
+            <artifactId>flink-table-store-spark-common</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -98,7 +101,7 @@ under the License.
                         <configuration>
                             <artifactSet>
                                 <includes combine.children="append">
-                                    
<include>org.apache.flink:flink-table-store-shade</include>
+                                    
<include>org.apache.flink:flink-table-store-spark-common</include>
                                 </includes>
                             </artifactSet>
                         </configuration>
@@ -107,4 +110,4 @@ under the License.
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
 
b/flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
similarity index 100%
rename from 
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
rename to 
flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
 
b/flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
similarity index 100%
rename from 
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
rename to 
flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
 
b/flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
similarity index 100%
rename from 
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
rename to 
flink-table-store-spark/flink-table-store-spark-2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
diff --git 
a/flink-table-store-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/flink-table-store-spark/flink-table-store-spark-2/src/main/resources/META-INF.services/org.apache.spark.sql.sources.DataSourceRegister
similarity index 100%
rename from 
flink-table-store-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
rename to 
flink-table-store-spark/flink-table-store-spark-2/src/main/resources/META-INF.services/org.apache.spark.sql.sources.DataSourceRegister
diff --git 
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
 
b/flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
similarity index 100%
rename from 
flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
rename to 
flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
 
b/flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
similarity index 100%
rename from 
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
rename to 
flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
diff --git 
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
 
b/flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
similarity index 100%
rename from 
flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
rename to 
flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
diff --git 
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
 
b/flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
similarity index 100%
rename from 
flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
rename to 
flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
diff --git 
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
 
b/flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
similarity index 100%
rename from 
flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
rename to 
flink-table-store-spark/flink-table-store-spark-2/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
diff --git a/flink-table-store-spark/src/test/resources/log4j2-test.properties 
b/flink-table-store-spark/flink-table-store-spark-2/src/test/resources/log4j2-test.properties
similarity index 100%
rename from flink-table-store-spark/src/test/resources/log4j2-test.properties
rename to 
flink-table-store-spark/flink-table-store-spark-2/src/test/resources/log4j2-test.properties
diff --git a/flink-table-store-spark2/pom.xml 
b/flink-table-store-spark/flink-table-store-spark-3.1/pom.xml
similarity index 66%
copy from flink-table-store-spark2/pom.xml
copy to flink-table-store-spark/flink-table-store-spark-3.1/pom.xml
index e9d129d4..45fe23bf 100644
--- a/flink-table-store-spark2/pom.xml
+++ b/flink-table-store-spark/flink-table-store-spark-3.1/pom.xml
@@ -23,47 +23,30 @@ under the License.
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <artifactId>flink-table-store-parent</artifactId>
         <groupId>org.apache.flink</groupId>
+        <artifactId>flink-table-store-spark</artifactId>
         <version>0.4-SNAPSHOT</version>
     </parent>
 
-    <artifactId>flink-table-store-spark2</artifactId>
-    <name>Flink Table Store : Spark2</name>
-
-    <packaging>jar</packaging>
+    <artifactId>flink-table-store-spark-3.1</artifactId>
+    <name>Flink Table Store : Spark : 3.1</name>
 
     <properties>
-        <spark2.version>2.4.8</spark2.version>
-        <jackson.version>2.13.3</jackson.version>
+        <spark.version>3.1.3</spark.version>
     </properties>
 
     <dependencies>
-        <!-- Flink All dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-shade</artifactId>
+            <artifactId>flink-table-store-spark-common</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
-            <version>${spark2.version}</version>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
             <exclusions>
-                <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
-                </exclusion>
-                <!-- The Jackson dependencies of Spark 2.4.8 have 
vulnerabilities. -->
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.module</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
                 <exclusion>
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
@@ -74,13 +57,6 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-
-        <dependency>
-            <groupId>com.fasterxml.jackson.module</groupId>
-            <artifactId>jackson-module-scala_2.11</artifactId>
-            <version>${jackson.version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -98,7 +74,7 @@ under the License.
                         <configuration>
                             <artifactSet>
                                 <includes combine.children="append">
-                                    
<include>org.apache.flink:flink-table-store-shade</include>
+                                    
<include>org.apache.flink:flink-table-store-spark-common</include>
                                 </includes>
                             </artifactSet>
                         </configuration>
@@ -107,4 +83,4 @@ under the License.
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-spark2/pom.xml 
b/flink-table-store-spark/flink-table-store-spark-3.2/pom.xml
similarity index 66%
copy from flink-table-store-spark2/pom.xml
copy to flink-table-store-spark/flink-table-store-spark-3.2/pom.xml
index e9d129d4..4f2610c1 100644
--- a/flink-table-store-spark2/pom.xml
+++ b/flink-table-store-spark/flink-table-store-spark-3.2/pom.xml
@@ -23,47 +23,30 @@ under the License.
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <artifactId>flink-table-store-parent</artifactId>
         <groupId>org.apache.flink</groupId>
+        <artifactId>flink-table-store-spark</artifactId>
         <version>0.4-SNAPSHOT</version>
     </parent>
 
-    <artifactId>flink-table-store-spark2</artifactId>
-    <name>Flink Table Store : Spark2</name>
-
-    <packaging>jar</packaging>
+    <artifactId>flink-table-store-spark-3.2</artifactId>
+    <name>Flink Table Store : Spark : 3.2</name>
 
     <properties>
-        <spark2.version>2.4.8</spark2.version>
-        <jackson.version>2.13.3</jackson.version>
+        <spark.version>3.2.2</spark.version>
     </properties>
 
     <dependencies>
-        <!-- Flink All dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-shade</artifactId>
+            <artifactId>flink-table-store-spark-common</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
-            <version>${spark2.version}</version>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
             <exclusions>
-                <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
-                </exclusion>
-                <!-- The Jackson dependencies of Spark 2.4.8 have 
vulnerabilities. -->
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.module</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
                 <exclusion>
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
@@ -74,13 +57,6 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-
-        <dependency>
-            <groupId>com.fasterxml.jackson.module</groupId>
-            <artifactId>jackson-module-scala_2.11</artifactId>
-            <version>${jackson.version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -98,7 +74,7 @@ under the License.
                         <configuration>
                             <artifactSet>
                                 <includes combine.children="append">
-                                    
<include>org.apache.flink:flink-table-store-shade</include>
+                                    
<include>org.apache.flink:flink-table-store-spark-common</include>
                                 </includes>
                             </artifactSet>
                         </configuration>
@@ -107,4 +83,4 @@ under the License.
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-spark2/pom.xml 
b/flink-table-store-spark/flink-table-store-spark-3.3/pom.xml
similarity index 66%
copy from flink-table-store-spark2/pom.xml
copy to flink-table-store-spark/flink-table-store-spark-3.3/pom.xml
index e9d129d4..a03ebd1d 100644
--- a/flink-table-store-spark2/pom.xml
+++ b/flink-table-store-spark/flink-table-store-spark-3.3/pom.xml
@@ -23,47 +23,30 @@ under the License.
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <artifactId>flink-table-store-parent</artifactId>
         <groupId>org.apache.flink</groupId>
+        <artifactId>flink-table-store-spark</artifactId>
         <version>0.4-SNAPSHOT</version>
     </parent>
 
-    <artifactId>flink-table-store-spark2</artifactId>
-    <name>Flink Table Store : Spark2</name>
-
-    <packaging>jar</packaging>
+    <artifactId>flink-table-store-spark-3.3</artifactId>
+    <name>Flink Table Store : Spark : 3.3</name>
 
     <properties>
-        <spark2.version>2.4.8</spark2.version>
-        <jackson.version>2.13.3</jackson.version>
+        <spark.version>3.3.0</spark.version>
     </properties>
 
     <dependencies>
-        <!-- Flink All dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-shade</artifactId>
+            <artifactId>flink-table-store-spark-common</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
-            <version>${spark2.version}</version>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
             <exclusions>
-                <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
-                </exclusion>
-                <!-- The Jackson dependencies of Spark 2.4.8 have 
vulnerabilities. -->
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.module</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
                 <exclusion>
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
@@ -74,13 +57,6 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-
-        <dependency>
-            <groupId>com.fasterxml.jackson.module</groupId>
-            <artifactId>jackson-module-scala_2.11</artifactId>
-            <version>${jackson.version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -98,7 +74,7 @@ under the License.
                         <configuration>
                             <artifactSet>
                                 <includes combine.children="append">
-                                    
<include>org.apache.flink:flink-table-store-shade</include>
+                                    
<include>org.apache.flink:flink-table-store-spark-common</include>
                                 </includes>
                             </artifactSet>
                         </configuration>
@@ -107,4 +83,4 @@ under the License.
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-spark2/pom.xml 
b/flink-table-store-spark/flink-table-store-spark-common/pom.xml
similarity index 66%
rename from flink-table-store-spark2/pom.xml
rename to flink-table-store-spark/flink-table-store-spark-common/pom.xml
index e9d129d4..1303a81c 100644
--- a/flink-table-store-spark2/pom.xml
+++ b/flink-table-store-spark/flink-table-store-spark-common/pom.xml
@@ -23,47 +23,26 @@ under the License.
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <artifactId>flink-table-store-parent</artifactId>
+        <artifactId>flink-table-store-spark</artifactId>
         <groupId>org.apache.flink</groupId>
         <version>0.4-SNAPSHOT</version>
     </parent>
 
-    <artifactId>flink-table-store-spark2</artifactId>
-    <name>Flink Table Store : Spark2</name>
-
     <packaging>jar</packaging>
 
+    <artifactId>flink-table-store-spark-common</artifactId>
+    <name>Flink Table Store : Spark : Common</name>
+
     <properties>
-        <spark2.version>2.4.8</spark2.version>
-        <jackson.version>2.13.3</jackson.version>
+        <spark.version>3.2.2</spark.version>
     </properties>
 
     <dependencies>
-        <!-- Flink All dependencies -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-shade</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
-            <version>${spark2.version}</version>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
             <exclusions>
-                <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
-                </exclusion>
-                <!-- The Jackson dependencies of Spark 2.4.8 have 
vulnerabilities. -->
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.module</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
                 <exclusion>
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
@@ -72,15 +51,12 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
-
-        <dependency>
-            <groupId>com.fasterxml.jackson.module</groupId>
-            <artifactId>jackson-module-scala_2.11</artifactId>
-            <version>${jackson.version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
similarity index 99%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 42986585..86a3b343 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -224,7 +224,7 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
     }
 
     @Override
-    public Table createTable(
+    public SparkTable createTable(
             Identifier ident,
             StructType schema,
             Transform[] partitions,
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
similarity index 98%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
index c6480e1d..d3da03b2 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
+++ 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.store.types.MapType;
 import org.apache.flink.table.store.types.MultisetType;
 import org.apache.flink.table.store.types.RowType;
 
-import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader;
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -167,7 +166,7 @@ public class SparkInternalRow extends 
org.apache.spark.sql.catalyst.InternalRow
 
     @Override
     public Object get(int ordinal, org.apache.spark.sql.types.DataType 
dataType) {
-        return SpecializedGettersReader.read(this, ordinal, dataType, true, 
true);
+        return SpecializedGettersReader.read(this, ordinal, dataType);
     }
 
     public static Object fromFlink(Object o, DataType type) {
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkRow.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkRow.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkRow.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkRow.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
similarity index 100%
rename from 
flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
similarity index 95%
rename from 
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
index 5eaf1738..7ed3633a 100644
--- 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
+++ 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -39,7 +39,10 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.TimestampType;
 import org.apache.spark.sql.types.UserDefinedType;
 
-/** Reader of Spark {@link SpecializedGetters}. */
+/**
+ * Reader of Spark {@link SpecializedGetters}. Copied from Spark to avoid 
conflict between Spark2
+ * and Spark3 .
+ */
 public final class SpecializedGettersReader {
 
     private SpecializedGettersReader() {}
diff --git 
a/flink-table-store-spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/flink-table-store-spark/flink-table-store-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
similarity index 100%
rename from 
flink-table-store-spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
similarity index 100%
rename from 
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
diff --git 
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
similarity index 100%
rename from 
flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
similarity index 100%
rename from 
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
similarity index 100%
rename from 
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
similarity index 91%
rename from 
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
index de365c31..fb6dc4f5 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
+++ 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.table.store.table.sink.StreamTableWrite;
 import org.apache.flink.table.store.types.DataField;
 import org.apache.flink.table.store.types.RowKind;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -39,10 +38,8 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -53,8 +50,6 @@ public abstract class SparkReadTestBase {
     private static final String COMMIT_USER = "user";
     private static final AtomicLong COMMIT_IDENTIFIER = new AtomicLong(0);
 
-    private static File warehouse = null;
-
     protected static SparkSession spark = null;
 
     protected static Path warehousePath = null;
@@ -64,10 +59,8 @@ public abstract class SparkReadTestBase {
     protected static Path tablePath2;
 
     @BeforeAll
-    public static void startMetastoreAndSpark() throws Exception {
-        warehouse = Files.createTempFile("warehouse", null).toFile();
-        assertThat(warehouse.delete()).isTrue();
-        warehousePath = new Path("file:" + warehouse);
+    public static void startMetastoreAndSpark(@TempDir java.nio.file.Path 
tempDir) {
+        warehousePath = new Path("file:" + tempDir.toString());
         spark = SparkSession.builder().master("local[2]").getOrCreate();
         spark.conf().set("spark.sql.catalog.tablestore", 
SparkCatalog.class.getName());
         spark.conf().set("spark.sql.catalog.tablestore.warehouse", 
warehousePath.toString());
@@ -76,10 +69,7 @@ public abstract class SparkReadTestBase {
     }
 
     @AfterAll
-    public static void stopMetastoreAndSpark() throws IOException {
-        if (warehouse != null && warehouse.exists()) {
-            FileUtils.deleteDirectory(warehouse);
-        }
+    public static void stopMetastoreAndSpark() {
         if (spark != null) {
             spark.stop();
             spark = null;
@@ -124,9 +114,7 @@ public abstract class SparkReadTestBase {
     public void afterEach() {
         List<Row> tables = spark.sql("show tables").collectAsList();
         tables.forEach(
-                table -> {
-                    spark.sql("DROP TABLE " + table.getString(0) + "." + 
table.getString(1));
-                });
+                table -> spark.sql("DROP TABLE " + table.getString(0) + "." + 
table.getString(1)));
     }
 
     protected void innerTestSimpleType(Dataset<Row> dataset) {
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
similarity index 94%
rename from 
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
index a8f06b80..313cba74 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
+++ 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.table.store.testutils.junit.parameterized.Parameters;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
@@ -60,6 +61,14 @@ public class SparkS3ITCase {
         spark.sql("USE tablestore.db");
     }
 
+    @AfterAll
+    public static void stopMetastoreAndSpark() {
+        if (spark != null) {
+            spark.stop();
+            spark = null;
+        }
+    }
+
     @Parameters(name = "{0}")
     public static Collection<String> parameters() {
         return Arrays.asList("avro", "orc", "parquet");
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
similarity index 100%
rename from 
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
similarity index 100%
rename from 
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
similarity index 90%
rename from 
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
index 045e0306..0307841a 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
+++ 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
@@ -22,11 +22,12 @@ import org.apache.flink.table.store.fs.Path;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
-import java.io.File;
 import java.util.Comparator;
 import java.util.List;
 
@@ -38,10 +39,8 @@ public class SparkWriteITCase {
     private static SparkSession spark = null;
 
     @BeforeAll
-    public static void startMetastoreAndSpark() throws Exception {
-        File warehouse = File.createTempFile("warehouse", null);
-        assertThat(warehouse.delete()).isTrue();
-        Path warehousePath = new Path("file:" + warehouse);
+    public static void startMetastoreAndSpark(@TempDir java.nio.file.Path 
tempDir) {
+        Path warehousePath = new Path("file:" + tempDir.toString());
         spark = SparkSession.builder().master("local[2]").getOrCreate();
         spark.conf().set("spark.sql.catalog.tablestore", 
SparkCatalog.class.getName());
         spark.conf().set("spark.sql.catalog.tablestore.warehouse", 
warehousePath.toString());
@@ -49,6 +48,14 @@ public class SparkWriteITCase {
         spark.sql("USE tablestore.db");
     }
 
+    @AfterAll
+    public static void stopMetastoreAndSpark() {
+        if (spark != null) {
+            spark.stop();
+            spark = null;
+        }
+    }
+
     @AfterEach
     public void afterEach() {
         spark.sql("DROP TABLE T");
diff --git a/flink-table-store-spark2/src/test/resources/log4j2-test.properties 
b/flink-table-store-spark/flink-table-store-spark-common/src/test/resources/log4j2-test.properties
similarity index 100%
rename from flink-table-store-spark2/src/test/resources/log4j2-test.properties
rename to 
flink-table-store-spark/flink-table-store-spark-common/src/test/resources/log4j2-test.properties
diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml
index 53e13d27..bd00d1b9 100644
--- a/flink-table-store-spark/pom.xml
+++ b/flink-table-store-spark/pom.xml
@@ -31,37 +31,29 @@ under the License.
     <artifactId>flink-table-store-spark</artifactId>
     <name>Flink Table Store : Spark</name>
 
-    <packaging>jar</packaging>
+    <packaging>pom</packaging>
 
     <properties>
-        <spark.version>3.2.2</spark.version>
         <aws.version>1.12.319</aws.version>
     </properties>
 
+    <modules>
+        <module>flink-table-store-spark-common</module>
+        <module>flink-table-store-spark-2</module>
+        <module>flink-table-store-spark-3.3</module>
+        <module>flink-table-store-spark-3.1</module>
+        <module>flink-table-store-spark-3.2</module>
+    </modules>
+
     <dependencies>
         <!-- Flink All dependencies -->
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-store-shade</artifactId>
             <version>${project.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.12</artifactId>
-            <version>${spark.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
         <!-- Test -->
 
         <dependency>
@@ -92,70 +84,4 @@ under the License.
             <scope>test</scope>
         </dependency>
     </dependencies>
-
-    <!-- Activate these profiles with -Pspark-x.x to build and test against 
different Spark versions -->
-    <profiles>
-        <profile>
-            <id>spark-3.3</id>
-            <properties>
-                <spark.version>3.3.0</spark.version>
-            </properties>
-        </profile>
-        <profile>
-            <id>spark-3.2</id>
-            <properties>
-                <spark.version>3.2.2</spark.version>
-            </properties>
-        </profile>
-        <profile>
-            <id>spark-3.1</id>
-            <properties>
-                <spark.version>3.1.3</spark.version>
-            </properties>
-        </profile>
-    </profiles>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>build-helper-maven-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>add-sources</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>src/main/${spark.version}</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>shade-flink</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <artifactSet>
-                                <includes combine.children="append">
-                                    
<include>org.apache.flink:flink-table-store-shade</include>
-                                </includes>
-                            </artifactSet>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
 </project>
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
deleted file mode 100644
index eab56117..00000000
--- 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.spark;
-
-import org.apache.flink.table.store.data.InternalArray;
-import org.apache.flink.table.store.types.ArrayType;
-import org.apache.flink.table.store.types.BigIntType;
-import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.table.store.utils.RowDataUtils;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-import static org.apache.flink.table.store.spark.SparkInternalRow.fromFlink;
-import static org.apache.flink.table.store.utils.RowDataUtils.copyArray;
-import static org.apache.flink.table.store.utils.TypeUtils.timestampPrecision;
-
-/** Spark {@link ArrayData} to wrap Flink {@code ArrayData}. */
-public class SparkArrayData extends ArrayData {
-
-    private final org.apache.flink.table.store.types.DataType elementType;
-
-    private InternalArray array;
-
-    public SparkArrayData(org.apache.flink.table.store.types.DataType 
elementType) {
-        this.elementType = elementType;
-    }
-
-    public SparkArrayData replace(InternalArray array) {
-        this.array = array;
-        return this;
-    }
-
-    @Override
-    public int numElements() {
-        return array.size();
-    }
-
-    @Override
-    public ArrayData copy() {
-        return new SparkArrayData(elementType).replace(copyArray(array, 
elementType));
-    }
-
-    @Override
-    public Object[] array() {
-        Object[] objects = new Object[numElements()];
-        for (int i = 0; i < objects.length; i++) {
-            objects[i] = fromFlink(RowDataUtils.get(array, i, elementType), 
elementType);
-        }
-        return objects;
-    }
-
-    @Override
-    public void setNullAt(int i) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void update(int i, Object value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isNullAt(int ordinal) {
-        return array.isNullAt(ordinal);
-    }
-
-    @Override
-    public boolean getBoolean(int ordinal) {
-        return array.getBoolean(ordinal);
-    }
-
-    @Override
-    public byte getByte(int ordinal) {
-        return array.getByte(ordinal);
-    }
-
-    @Override
-    public short getShort(int ordinal) {
-        return array.getShort(ordinal);
-    }
-
-    @Override
-    public int getInt(int ordinal) {
-        return array.getInt(ordinal);
-    }
-
-    @Override
-    public long getLong(int ordinal) {
-        if (elementType instanceof BigIntType) {
-            return array.getLong(ordinal);
-        }
-
-        return getTimestampMicros(ordinal);
-    }
-
-    private long getTimestampMicros(int ordinal) {
-        return fromFlink(array.getTimestamp(ordinal, 
timestampPrecision(elementType)));
-    }
-
-    @Override
-    public float getFloat(int ordinal) {
-        return array.getFloat(ordinal);
-    }
-
-    @Override
-    public double getDouble(int ordinal) {
-        return array.getDouble(ordinal);
-    }
-
-    @Override
-    public Decimal getDecimal(int ordinal, int precision, int scale) {
-        return fromFlink(array.getDecimal(ordinal, precision, scale));
-    }
-
-    @Override
-    public UTF8String getUTF8String(int ordinal) {
-        return fromFlink(array.getString(ordinal));
-    }
-
-    @Override
-    public byte[] getBinary(int ordinal) {
-        return array.getBinary(ordinal);
-    }
-
-    @Override
-    public CalendarInterval getInterval(int ordinal) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public InternalRow getStruct(int ordinal, int numFields) {
-        return fromFlink(array.getRow(ordinal, numFields), (RowType) 
elementType);
-    }
-
-    @Override
-    public ArrayData getArray(int ordinal) {
-        return fromFlink(array.getArray(ordinal), (ArrayType) elementType);
-    }
-
-    @Override
-    public MapData getMap(int ordinal) {
-        return fromFlink(array.getMap(ordinal), elementType);
-    }
-
-    @Override
-    public Object get(int ordinal, DataType dataType) {
-        return SpecializedGettersReader.read(this, ordinal, dataType);
-    }
-}
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
deleted file mode 100644
index cb12bcd4..00000000
--- 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.spark;
-
-import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.predicate.PredicateBuilder;
-import org.apache.flink.table.store.types.DataType;
-import org.apache.flink.table.store.types.RowType;
-
-import org.apache.spark.sql.sources.And;
-import org.apache.spark.sql.sources.EqualTo;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.GreaterThan;
-import org.apache.spark.sql.sources.GreaterThanOrEqual;
-import org.apache.spark.sql.sources.In;
-import org.apache.spark.sql.sources.IsNotNull;
-import org.apache.spark.sql.sources.IsNull;
-import org.apache.spark.sql.sources.LessThan;
-import org.apache.spark.sql.sources.LessThanOrEqual;
-import org.apache.spark.sql.sources.Not;
-import org.apache.spark.sql.sources.Or;
-import org.apache.spark.sql.sources.StringStartsWith;
-
-import java.util.Arrays;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.convertJavaObject;
-
-/** Conversion from {@link Filter} to {@link Predicate}. */
-public class SparkFilterConverter {
-
-    private final RowType rowType;
-    private final PredicateBuilder builder;
-
-    public SparkFilterConverter(RowType rowType) {
-        this.rowType = rowType;
-        this.builder = new PredicateBuilder(rowType);
-    }
-
-    public Predicate convert(Filter filter) {
-        if (filter instanceof EqualTo) {
-            EqualTo eq = (EqualTo) filter;
-            // TODO deal with isNaN
-            int index = fieldIndex(eq.attribute());
-            Object literal = convertLiteral(index, eq.value());
-            return builder.equal(index, literal);
-        } else if (filter instanceof GreaterThan) {
-            GreaterThan gt = (GreaterThan) filter;
-            int index = fieldIndex(gt.attribute());
-            Object literal = convertLiteral(index, gt.value());
-            return builder.greaterThan(index, literal);
-        } else if (filter instanceof GreaterThanOrEqual) {
-            GreaterThanOrEqual gt = (GreaterThanOrEqual) filter;
-            int index = fieldIndex(gt.attribute());
-            Object literal = convertLiteral(index, gt.value());
-            return builder.greaterOrEqual(index, literal);
-        } else if (filter instanceof LessThan) {
-            LessThan lt = (LessThan) filter;
-            int index = fieldIndex(lt.attribute());
-            Object literal = convertLiteral(index, lt.value());
-            return builder.lessThan(index, literal);
-        } else if (filter instanceof LessThanOrEqual) {
-            LessThanOrEqual lt = (LessThanOrEqual) filter;
-            int index = fieldIndex(lt.attribute());
-            Object literal = convertLiteral(index, lt.value());
-            return builder.lessOrEqual(index, literal);
-        } else if (filter instanceof In) {
-            In in = (In) filter;
-            int index = fieldIndex(in.attribute());
-            return builder.in(
-                    index,
-                    Arrays.stream(in.values())
-                            .map(v -> convertLiteral(index, v))
-                            .collect(Collectors.toList()));
-        } else if (filter instanceof IsNull) {
-            return builder.isNull(fieldIndex(((IsNull) filter).attribute()));
-        } else if (filter instanceof IsNotNull) {
-            return builder.isNotNull(fieldIndex(((IsNotNull) 
filter).attribute()));
-        } else if (filter instanceof And) {
-            And and = (And) filter;
-            return PredicateBuilder.and(convert(and.left()), 
convert(and.right()));
-        } else if (filter instanceof Or) {
-            Or or = (Or) filter;
-            return PredicateBuilder.or(convert(or.left()), 
convert(or.right()));
-        } else if (filter instanceof Not) {
-            Not not = (Not) filter;
-            return 
convert(not.child()).negate().orElseThrow(UnsupportedOperationException::new);
-        } else if (filter instanceof StringStartsWith) {
-            StringStartsWith startsWith = (StringStartsWith) filter;
-            int index = fieldIndex(startsWith.attribute());
-            Object literal = convertLiteral(index, startsWith.value());
-            return builder.startsWith(index, literal);
-        }
-
-        // TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe
-        throw new UnsupportedOperationException();
-    }
-
-    private int fieldIndex(String field) {
-        int index = rowType.getFieldIndex(field);
-        // TODO: support nested field
-        if (index == -1) {
-            throw new UnsupportedOperationException();
-        }
-        return index;
-    }
-
-    private Object convertLiteral(int index, Object value) {
-        DataType type = rowType.getTypeAt(index);
-        return convertJavaObject(type, value);
-    }
-}
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
deleted file mode 100644
index cac523c1..00000000
--- 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.spark;
-
-import org.apache.flink.table.store.data.BinaryString;
-import org.apache.flink.table.store.data.InternalArray;
-import org.apache.flink.table.store.data.InternalMap;
-import org.apache.flink.table.store.data.InternalRow;
-import org.apache.flink.table.store.data.Timestamp;
-import org.apache.flink.table.store.types.ArrayType;
-import org.apache.flink.table.store.types.BigIntType;
-import org.apache.flink.table.store.types.IntType;
-import org.apache.flink.table.store.types.MapType;
-import org.apache.flink.table.store.types.MultisetType;
-import org.apache.flink.table.store.types.RowType;
-
-import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-import static org.apache.flink.table.store.utils.RowDataUtils.copyRowData;
-import static org.apache.flink.table.store.utils.TypeUtils.timestampPrecision;
-
-/** Spark {@link org.apache.spark.sql.catalyst.InternalRow} to wrap {@link 
InternalRow}. */
-public class SparkInternalRow extends 
org.apache.spark.sql.catalyst.InternalRow {
-
-    private final RowType rowType;
-
-    private InternalRow row;
-
-    public SparkInternalRow(RowType rowType) {
-        this.rowType = rowType;
-    }
-
-    public SparkInternalRow replace(InternalRow row) {
-        this.row = row;
-        return this;
-    }
-
-    @Override
-    public int numFields() {
-        return row.getFieldCount();
-    }
-
-    @Override
-    public void setNullAt(int i) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void update(int i, Object value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public org.apache.spark.sql.catalyst.InternalRow copy() {
-        return new SparkInternalRow(rowType).replace(copyRowData(row, 
rowType));
-    }
-
-    @Override
-    public boolean isNullAt(int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    @Override
-    public boolean getBoolean(int ordinal) {
-        return row.getBoolean(ordinal);
-    }
-
-    @Override
-    public byte getByte(int ordinal) {
-        return row.getByte(ordinal);
-    }
-
-    @Override
-    public short getShort(int ordinal) {
-        return row.getShort(ordinal);
-    }
-
-    @Override
-    public int getInt(int ordinal) {
-        return row.getInt(ordinal);
-    }
-
-    @Override
-    public long getLong(int ordinal) {
-        if (rowType.getTypeAt(ordinal) instanceof BigIntType) {
-            return row.getLong(ordinal);
-        }
-
-        return getTimestampMicros(ordinal);
-    }
-
-    private long getTimestampMicros(int ordinal) {
-        org.apache.flink.table.store.types.DataType type = 
rowType.getTypeAt(ordinal);
-        return fromFlink(row.getTimestamp(ordinal, timestampPrecision(type)));
-    }
-
-    @Override
-    public float getFloat(int ordinal) {
-        return row.getFloat(ordinal);
-    }
-
-    @Override
-    public double getDouble(int ordinal) {
-        return row.getDouble(ordinal);
-    }
-
-    @Override
-    public Decimal getDecimal(int ordinal, int precision, int scale) {
-        org.apache.flink.table.store.data.Decimal decimal =
-                row.getDecimal(ordinal, precision, scale);
-        return fromFlink(decimal);
-    }
-
-    @Override
-    public UTF8String getUTF8String(int ordinal) {
-        return fromFlink(row.getString(ordinal));
-    }
-
-    @Override
-    public byte[] getBinary(int ordinal) {
-        return row.getBinary(ordinal);
-    }
-
-    @Override
-    public CalendarInterval getInterval(int ordinal) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, 
int numFields) {
-        return fromFlink(row.getRow(ordinal, numFields), (RowType) 
rowType.getTypeAt(ordinal));
-    }
-
-    @Override
-    public ArrayData getArray(int ordinal) {
-        return fromFlink(row.getArray(ordinal), (ArrayType) 
rowType.getTypeAt(ordinal));
-    }
-
-    @Override
-    public MapData getMap(int ordinal) {
-        return fromFlink(row.getMap(ordinal), rowType.getTypeAt(ordinal));
-    }
-
-    @Override
-    public Object get(int ordinal, DataType dataType) {
-        return SpecializedGettersReader.read(this, ordinal, dataType);
-    }
-
-    public static Object fromFlink(Object o, 
org.apache.flink.table.store.types.DataType type) {
-        if (o == null) {
-            return null;
-        }
-        switch (type.getTypeRoot()) {
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return fromFlink((Timestamp) o);
-            case CHAR:
-            case VARCHAR:
-                return fromFlink((BinaryString) o);
-            case DECIMAL:
-                return fromFlink((org.apache.flink.table.store.data.Decimal) 
o);
-            case ARRAY:
-                return fromFlink((InternalArray) o, (ArrayType) type);
-            case MAP:
-            case MULTISET:
-                return fromFlink((InternalMap) o, type);
-            case ROW:
-                return fromFlink((InternalRow) o, (RowType) type);
-            default:
-                return o;
-        }
-    }
-
-    public static UTF8String fromFlink(BinaryString string) {
-        return UTF8String.fromBytes(string.toBytes());
-    }
-
-    public static Decimal fromFlink(org.apache.flink.table.store.data.Decimal 
decimal) {
-        return Decimal.apply(decimal.toBigDecimal());
-    }
-
-    public static org.apache.spark.sql.catalyst.InternalRow fromFlink(
-            InternalRow row, RowType rowType) {
-        return new SparkInternalRow(rowType).replace(row);
-    }
-
-    public static long fromFlink(Timestamp timestamp) {
-        return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp());
-    }
-
-    public static ArrayData fromFlink(InternalArray array, ArrayType 
arrayType) {
-        return fromFlinkArrayElementType(array, arrayType.getElementType());
-    }
-
-    private static ArrayData fromFlinkArrayElementType(
-            InternalArray array, org.apache.flink.table.store.types.DataType 
elementType) {
-        return new SparkArrayData(elementType).replace(array);
-    }
-
-    public static MapData fromFlink(
-            InternalMap map, org.apache.flink.table.store.types.DataType 
mapType) {
-        org.apache.flink.table.store.types.DataType keyType;
-        org.apache.flink.table.store.types.DataType valueType;
-        if (mapType instanceof MapType) {
-            keyType = ((MapType) mapType).getKeyType();
-            valueType = ((MapType) mapType).getValueType();
-        } else if (mapType instanceof MultisetType) {
-            keyType = ((MultisetType) mapType).getElementType();
-            valueType = new IntType();
-        } else {
-            throw new UnsupportedOperationException("Unsupported type: " + 
mapType);
-        }
-
-        return new ArrayBasedMapData(
-                fromFlinkArrayElementType(map.keyArray(), keyType),
-                fromFlinkArrayElementType(map.valueArray(), valueType));
-    }
-}
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
deleted file mode 100644
index eaffc44d..00000000
--- 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.spark;
-
-import org.apache.flink.table.store.types.ArrayType;
-import org.apache.flink.table.store.types.BigIntType;
-import org.apache.flink.table.store.types.BinaryType;
-import org.apache.flink.table.store.types.BooleanType;
-import org.apache.flink.table.store.types.CharType;
-import org.apache.flink.table.store.types.DataField;
-import org.apache.flink.table.store.types.DataTypeDefaultVisitor;
-import org.apache.flink.table.store.types.DateType;
-import org.apache.flink.table.store.types.DecimalType;
-import org.apache.flink.table.store.types.DoubleType;
-import org.apache.flink.table.store.types.FloatType;
-import org.apache.flink.table.store.types.IntType;
-import org.apache.flink.table.store.types.LocalZonedTimestampType;
-import org.apache.flink.table.store.types.MapType;
-import org.apache.flink.table.store.types.MultisetType;
-import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.table.store.types.SmallIntType;
-import org.apache.flink.table.store.types.TimestampType;
-import org.apache.flink.table.store.types.TinyIntType;
-import org.apache.flink.table.store.types.VarBinaryType;
-import org.apache.flink.table.store.types.VarCharType;
-
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-/** Utils for Spark {@link DataType}. */
-public class SparkTypeUtils {
-
-    private SparkTypeUtils() {}
-
-    public static StructType fromFlinkRowType(RowType type) {
-        return (StructType) fromFlinkType(type);
-    }
-
-    public static DataType 
fromFlinkType(org.apache.flink.table.store.types.DataType type) {
-        return type.accept(FlinkToSparkTypeVisitor.INSTANCE);
-    }
-
-    private static class FlinkToSparkTypeVisitor extends 
DataTypeDefaultVisitor<DataType> {
-
-        private static final FlinkToSparkTypeVisitor INSTANCE = new 
FlinkToSparkTypeVisitor();
-
-        @Override
-        public DataType visit(CharType charType) {
-            return DataTypes.StringType;
-        }
-
-        @Override
-        public DataType visit(VarCharType varCharType) {
-            return DataTypes.StringType;
-        }
-
-        @Override
-        public DataType visit(BooleanType booleanType) {
-            return DataTypes.BooleanType;
-        }
-
-        @Override
-        public DataType visit(BinaryType binaryType) {
-            return DataTypes.BinaryType;
-        }
-
-        @Override
-        public DataType visit(VarBinaryType varBinaryType) {
-            return DataTypes.BinaryType;
-        }
-
-        @Override
-        public DataType visit(DecimalType decimalType) {
-            return DataTypes.createDecimalType(decimalType.getPrecision(), 
decimalType.getScale());
-        }
-
-        @Override
-        public DataType visit(TinyIntType tinyIntType) {
-            return DataTypes.ByteType;
-        }
-
-        @Override
-        public DataType visit(SmallIntType smallIntType) {
-            return DataTypes.ShortType;
-        }
-
-        @Override
-        public DataType visit(IntType intType) {
-            return DataTypes.IntegerType;
-        }
-
-        @Override
-        public DataType visit(BigIntType bigIntType) {
-            return DataTypes.LongType;
-        }
-
-        @Override
-        public DataType visit(FloatType floatType) {
-            return DataTypes.FloatType;
-        }
-
-        @Override
-        public DataType visit(DoubleType doubleType) {
-            return DataTypes.DoubleType;
-        }
-
-        @Override
-        public DataType visit(DateType dateType) {
-            return DataTypes.DateType;
-        }
-
-        @Override
-        public DataType visit(TimestampType timestampType) {
-            return DataTypes.TimestampType;
-        }
-
-        @Override
-        public DataType visit(LocalZonedTimestampType localZonedTimestampType) 
{
-            return DataTypes.TimestampType;
-        }
-
-        @Override
-        public DataType visit(ArrayType arrayType) {
-            org.apache.flink.table.store.types.DataType elementType = 
arrayType.getElementType();
-            return DataTypes.createArrayType(elementType.accept(this), 
elementType.isNullable());
-        }
-
-        @Override
-        public DataType visit(MultisetType multisetType) {
-            return DataTypes.createMapType(
-                    multisetType.getElementType().accept(this), 
DataTypes.IntegerType, false);
-        }
-
-        @Override
-        public DataType visit(MapType mapType) {
-            return DataTypes.createMapType(
-                    mapType.getKeyType().accept(this),
-                    mapType.getValueType().accept(this),
-                    mapType.getValueType().isNullable());
-        }
-
-        @Override
-        public DataType visit(RowType rowType) {
-            List<StructField> fields = new 
ArrayList<>(rowType.getFieldCount());
-            for (DataField field : rowType.getFields()) {
-                StructField structField =
-                        DataTypes.createStructField(
-                                field.name(), field.type().accept(this), 
field.type().isNullable());
-                structField =
-                        Optional.ofNullable(field.description())
-                                .map(structField::withComment)
-                                .orElse(structField);
-                fields.add(structField);
-            }
-            return DataTypes.createStructType(fields);
-        }
-
-        @Override
-        protected DataType 
defaultMethod(org.apache.flink.table.store.types.DataType logicalType) {
-            throw new UnsupportedOperationException("Unsupported type: " + 
logicalType);
-        }
-    }
-}
diff --git a/pom.xml b/pom.xml
index 94ac4b2c..e9dd516e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,7 +65,6 @@ under the License.
         <module>flink-table-store-shade</module>
         <module>flink-table-store-hive</module>
         <module>flink-table-store-spark</module>
-        <module>flink-table-store-spark2</module>
         <module>flink-table-store-test-utils</module>
     </modules>
 

Reply via email to