This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new c58576eb [FLINK-27947] Introduce Spark Reader for table store
c58576eb is described below

commit c58576eb3bd3d860c5ba5a940b4d0e0b3cb5f55a
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 17 16:31:48 2022 +0800

    [FLINK-27947] Introduce Spark Reader for table store
    
    This closes #147
---
 flink-table-store-spark/pom.xml                    | 214 ++++++++++++++++++
 .../flink/table/store/spark/SparkArrayData.java    | 172 +++++++++++++++
 .../table/store/spark/SparkFilterConverter.java    | 113 ++++++++++
 .../table/store/spark/SparkInputPartition.java     |  55 +++++
 .../flink/table/store/spark/SparkInternalRow.java  | 241 +++++++++++++++++++++
 .../table/store/spark/SparkReaderFactory.java      |  93 ++++++++
 .../apache/flink/table/store/spark/SparkScan.java  | 117 ++++++++++
 .../flink/table/store/spark/SparkScanBuilder.java  |  85 ++++++++
 .../flink/table/store/spark/SparkSource.java       |  68 ++++++
 .../apache/flink/table/store/spark/SparkTable.java |  64 ++++++
 .../flink/table/store/spark/SparkTypeUtils.java    | 182 ++++++++++++++++
 ...org.apache.spark.sql.sources.DataSourceRegister |  16 ++
 .../table/store/spark/SimpleTableTestHelper.java   |  70 ++++++
 .../store/spark/SparkFilterConverterTest.java      | 141 ++++++++++++
 .../table/store/spark/SparkInternalRowTest.java    | 116 ++++++++++
 .../flink/table/store/spark/SparkReadITCase.java   | 115 ++++++++++
 .../flink/table/store/spark/SparkTypeTest.java     | 105 +++++++++
 .../src/test/resources/log4j2-test.properties      |  38 ++++
 pom.xml                                            |   3 +-
 19 files changed, 2007 insertions(+), 1 deletion(-)

diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml
new file mode 100644
index 00000000..b2987c23
--- /dev/null
+++ b/flink-table-store-spark/pom.xml
@@ -0,0 +1,214 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-spark</artifactId>
+    <name>Flink Table Store : Spark</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <spark.version>3.2.1</spark.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-common</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-core</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-metrics-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-lang3</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.esotericsoftware.kryo</groupId>
+                    <artifactId>kryo</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-collections</groupId>
+                    <artifactId>commons-collections</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-codegen-loader</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-format</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-avro</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+    </dependencies>
+
+    <!-- Activate these profiles with -Pspark-x.x to build and test against 
different Spark versions -->
+    <profiles>
+        <profile>
+            <id>spark-3.2</id>
+            <properties>
+                <spark.version>3.2.1</spark.version>
+            </properties>
+        </profile>
+        <profile>
+            <id>spark-3.1</id>
+            <properties>
+                <spark.version>3.1.3</spark.version>
+            </properties>
+        </profile>
+        <profile>
+            <id>spark-3.0</id>
+            <properties>
+                <spark.version>3.0.3</spark.version>
+            </properties>
+        </profile>
+    </profiles>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-sources</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/main/${spark.version}</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
new file mode 100644
index 00000000..1e18fe30
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.utils.RowDataUtils;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import static org.apache.flink.table.store.spark.SparkInternalRow.fromFlink;
+import static org.apache.flink.table.store.utils.RowDataUtils.copyArray;
+import static org.apache.flink.table.store.utils.TypeUtils.timestampPrecision;
+
+/** Spark {@link ArrayData} to wrap flink {@code ArrayData}. */
+public class SparkArrayData extends ArrayData {
+
+    private final LogicalType elementType;
+
+    private org.apache.flink.table.data.ArrayData array;
+
+    public SparkArrayData(LogicalType elementType) {
+        this.elementType = elementType;
+    }
+
+    public SparkArrayData replace(org.apache.flink.table.data.ArrayData array) 
{
+        this.array = array;
+        return this;
+    }
+
+    @Override
+    public int numElements() {
+        return array.size();
+    }
+
+    @Override
+    public ArrayData copy() {
+        return new SparkArrayData(elementType).replace(copyArray(array, 
elementType));
+    }
+
+    @Override
+    public Object[] array() {
+        Object[] objects = new Object[numElements()];
+        for (int i = 0; i < objects.length; i++) {
+            objects[i] = fromFlink(RowDataUtils.get(array, i, elementType), 
elementType);
+        }
+        return objects;
+    }
+
+    @Override
+    public void setNullAt(int i) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void update(int i, Object value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isNullAt(int ordinal) {
+        return array.isNullAt(ordinal);
+    }
+
+    @Override
+    public boolean getBoolean(int ordinal) {
+        return array.getBoolean(ordinal);
+    }
+
+    @Override
+    public byte getByte(int ordinal) {
+        return array.getByte(ordinal);
+    }
+
+    @Override
+    public short getShort(int ordinal) {
+        return array.getShort(ordinal);
+    }
+
+    @Override
+    public int getInt(int ordinal) {
+        return array.getInt(ordinal);
+    }
+
+    @Override
+    public long getLong(int ordinal) {
+        if (elementType instanceof BigIntType) {
+            return array.getLong(ordinal);
+        }
+
+        return getTimestampMicros(ordinal);
+    }
+
+    private long getTimestampMicros(int ordinal) {
+        return fromFlink(array.getTimestamp(ordinal, 
timestampPrecision(elementType)));
+    }
+
+    @Override
+    public float getFloat(int ordinal) {
+        return array.getFloat(ordinal);
+    }
+
+    @Override
+    public double getDouble(int ordinal) {
+        return array.getDouble(ordinal);
+    }
+
+    @Override
+    public Decimal getDecimal(int ordinal, int precision, int scale) {
+        return fromFlink(array.getDecimal(ordinal, precision, scale));
+    }
+
+    @Override
+    public UTF8String getUTF8String(int ordinal) {
+        return fromFlink(array.getString(ordinal));
+    }
+
+    @Override
+    public byte[] getBinary(int ordinal) {
+        return array.getBinary(ordinal);
+    }
+
+    @Override
+    public CalendarInterval getInterval(int ordinal) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public InternalRow getStruct(int ordinal, int numFields) {
+        return fromFlink(array.getRow(ordinal, numFields), (RowType) 
elementType);
+    }
+
+    @Override
+    public ArrayData getArray(int ordinal) {
+        return fromFlink(array.getArray(ordinal), (ArrayType) elementType);
+    }
+
+    @Override
+    public MapData getMap(int ordinal) {
+        return fromFlink(array.getMap(ordinal), elementType);
+    }
+
+    @Override
+    public Object get(int ordinal, DataType dataType) {
+        return SpecializedGettersReader.read(this, ordinal, dataType, true, 
true);
+    }
+}
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
new file mode 100644
index 00000000..ec077c2b
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.sql.sources.And;
+import org.apache.spark.sql.sources.EqualTo;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.GreaterThanOrEqual;
+import org.apache.spark.sql.sources.IsNotNull;
+import org.apache.spark.sql.sources.IsNull;
+import org.apache.spark.sql.sources.LessThan;
+import org.apache.spark.sql.sources.LessThanOrEqual;
+import org.apache.spark.sql.sources.Not;
+import org.apache.spark.sql.sources.Or;
+import org.apache.spark.sql.sources.StringStartsWith;
+
+/** Conversion from {@link Filter} to {@link Predicate}. */
+public class SparkFilterConverter {
+
+    private final RowType rowType;
+
+    public SparkFilterConverter(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    public Predicate convert(Filter filter) {
+        if (filter instanceof EqualTo) {
+            EqualTo eq = (EqualTo) filter;
+            // TODO deal with isNaN
+            int index = fieldIndex(eq.attribute());
+            Literal literal = convertLiteral(index, eq.value());
+            return PredicateBuilder.equal(index, literal);
+        } else if (filter instanceof GreaterThan) {
+            GreaterThan gt = (GreaterThan) filter;
+            int index = fieldIndex(gt.attribute());
+            Literal literal = convertLiteral(index, gt.value());
+            return PredicateBuilder.greaterThan(index, literal);
+        } else if (filter instanceof GreaterThanOrEqual) {
+            GreaterThanOrEqual gt = (GreaterThanOrEqual) filter;
+            int index = fieldIndex(gt.attribute());
+            Literal literal = convertLiteral(index, gt.value());
+            return PredicateBuilder.greaterOrEqual(index, literal);
+        } else if (filter instanceof LessThan) {
+            LessThan lt = (LessThan) filter;
+            int index = fieldIndex(lt.attribute());
+            Literal literal = convertLiteral(index, lt.value());
+            return PredicateBuilder.lessThan(index, literal);
+        } else if (filter instanceof LessThanOrEqual) {
+            LessThanOrEqual lt = (LessThanOrEqual) filter;
+            int index = fieldIndex(lt.attribute());
+            Literal literal = convertLiteral(index, lt.value());
+            return PredicateBuilder.lessOrEqual(index, literal);
+        } else if (filter instanceof IsNull) {
+            return PredicateBuilder.isNull(fieldIndex(((IsNull) 
filter).attribute()));
+        } else if (filter instanceof IsNotNull) {
+            return PredicateBuilder.isNotNull(fieldIndex(((IsNotNull) 
filter).attribute()));
+        } else if (filter instanceof And) {
+            And and = (And) filter;
+            return PredicateBuilder.and(convert(and.left()), 
convert(and.right()));
+        } else if (filter instanceof Or) {
+            Or or = (Or) filter;
+            return PredicateBuilder.or(convert(or.left()), 
convert(or.right()));
+        } else if (filter instanceof Not) {
+            Not not = (Not) filter;
+            return 
convert(not.child()).negate().orElseThrow(UnsupportedOperationException::new);
+        } else if (filter instanceof StringStartsWith) {
+            StringStartsWith startsWith = (StringStartsWith) filter;
+            int index = fieldIndex(startsWith.attribute());
+            Literal literal = convertLiteral(index, startsWith.value());
+            return PredicateBuilder.startsWith(index, literal);
+        }
+
+        // TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe
+        throw new UnsupportedOperationException();
+    }
+
+    private int fieldIndex(String field) {
+        int index = rowType.getFieldIndex(field);
+        // TODO: support nested field
+        if (index == -1) {
+            throw new UnsupportedOperationException();
+        }
+        return index;
+    }
+
+    private Literal convertLiteral(int index, Object value) {
+        LogicalType type = rowType.getTypeAt(index);
+        return Literal.fromJavaObject(type, value);
+    }
+}
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
new file mode 100644
index 00000000..33e7fa51
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.store.table.source.Split;
+
+import org.apache.spark.sql.connector.read.InputPartition;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/** A Spark {@link InputPartition} for table store. */
+public class SparkInputPartition implements InputPartition {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient Split split;
+
+    public SparkInputPartition(Split split) {
+        this.split = split;
+    }
+
+    public Split split() {
+        return split;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        split.serialize(new DataOutputViewStreamWrapper(out));
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.defaultReadObject();
+        split = Split.deserialize(new DataInputViewStreamWrapper(in));
+    }
+}
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
new file mode 100644
index 00000000..2c23e992
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader;
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import static org.apache.flink.table.store.utils.RowDataUtils.copyRowData;
+import static org.apache.flink.table.store.utils.TypeUtils.timestampPrecision;
+
+/** Spark {@link InternalRow} to wrap {@link RowData}. */
+public class SparkInternalRow extends InternalRow {
+
+    private final RowType rowType;
+
+    private RowData row;
+
+    public SparkInternalRow(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    public SparkInternalRow replace(RowData row) {
+        this.row = row;
+        return this;
+    }
+
+    @Override
+    public int numFields() {
+        return row.getArity();
+    }
+
+    @Override
+    public void setNullAt(int i) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void update(int i, Object value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public InternalRow copy() {
+        return new SparkInternalRow(rowType).replace(copyRowData(row, 
rowType));
+    }
+
+    @Override
+    public boolean isNullAt(int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    @Override
+    public boolean getBoolean(int ordinal) {
+        return row.getBoolean(ordinal);
+    }
+
+    @Override
+    public byte getByte(int ordinal) {
+        return row.getByte(ordinal);
+    }
+
+    @Override
+    public short getShort(int ordinal) {
+        return row.getShort(ordinal);
+    }
+
+    @Override
+    public int getInt(int ordinal) {
+        return row.getInt(ordinal);
+    }
+
+    @Override
+    public long getLong(int ordinal) {
+        if (rowType.getTypeAt(ordinal) instanceof BigIntType) {
+            return row.getLong(ordinal);
+        }
+
+        return getTimestampMicros(ordinal);
+    }
+
+    private long getTimestampMicros(int ordinal) {
+        LogicalType type = rowType.getTypeAt(ordinal);
+        return fromFlink(row.getTimestamp(ordinal, timestampPrecision(type)));
+    }
+
+    @Override
+    public float getFloat(int ordinal) {
+        return row.getFloat(ordinal);
+    }
+
+    @Override
+    public double getDouble(int ordinal) {
+        return row.getDouble(ordinal);
+    }
+
+    @Override
+    public Decimal getDecimal(int ordinal, int precision, int scale) {
+        DecimalData decimal = row.getDecimal(ordinal, precision, scale);
+        return fromFlink(decimal);
+    }
+
+    @Override
+    public UTF8String getUTF8String(int ordinal) {
+        return fromFlink(row.getString(ordinal));
+    }
+
+    @Override
+    public byte[] getBinary(int ordinal) {
+        return row.getBinary(ordinal);
+    }
+
+    @Override
+    public CalendarInterval getInterval(int ordinal) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public InternalRow getStruct(int ordinal, int numFields) {
+        return fromFlink(row.getRow(ordinal, numFields), (RowType) 
rowType.getTypeAt(ordinal));
+    }
+
+    @Override
+    public ArrayData getArray(int ordinal) {
+        return fromFlink(row.getArray(ordinal), (ArrayType) 
rowType.getTypeAt(ordinal));
+    }
+
+    @Override
+    public MapData getMap(int ordinal) {
+        return fromFlink(row.getMap(ordinal), rowType.getTypeAt(ordinal));
+    }
+
+    @Override
+    public Object get(int ordinal, DataType dataType) {
+        return SpecializedGettersReader.read(this, ordinal, dataType, true, 
true);
+    }
+
+    public static Object fromFlink(Object o, LogicalType type) {
+        if (o == null) {
+            return null;
+        }
+        switch (type.getTypeRoot()) {
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return fromFlink((TimestampData) o);
+            case CHAR:
+            case VARCHAR:
+                return fromFlink((StringData) o);
+            case DECIMAL:
+                return fromFlink((DecimalData) o);
+            case ARRAY:
+                return fromFlink((org.apache.flink.table.data.ArrayData) o, 
(ArrayType) type);
+            case MAP:
+            case MULTISET:
+                return fromFlink((org.apache.flink.table.data.MapData) o, 
type);
+            case ROW:
+                return fromFlink((RowData) o, (RowType) type);
+            default:
+                return o;
+        }
+    }
+
+    public static UTF8String fromFlink(StringData string) {
+        return UTF8String.fromBytes(string.toBytes());
+    }
+
+    public static Decimal fromFlink(DecimalData decimal) {
+        return Decimal.apply(decimal.toBigDecimal());
+    }
+
+    public static InternalRow fromFlink(RowData row, RowType rowType) {
+        return new SparkInternalRow(rowType).replace(row);
+    }
+
+    public static long fromFlink(TimestampData timestamp) {
+        return DateTimeUtils.fromJavaTimestamp(timestamp.toTimestamp());
+    }
+
+    public static ArrayData fromFlink(
+            org.apache.flink.table.data.ArrayData array, ArrayType arrayType) {
+        return fromFlinkArrayElementType(array, arrayType.getElementType());
+    }
+
+    private static ArrayData fromFlinkArrayElementType(
+            org.apache.flink.table.data.ArrayData array, LogicalType 
elementType) {
+        return new SparkArrayData(elementType).replace(array);
+    }
+
+    public static MapData fromFlink(org.apache.flink.table.data.MapData map, 
LogicalType mapType) {
+        LogicalType keyType;
+        LogicalType valueType;
+        if (mapType instanceof MapType) {
+            keyType = ((MapType) mapType).getKeyType();
+            valueType = ((MapType) mapType).getValueType();
+        } else if (mapType instanceof MultisetType) {
+            keyType = ((MultisetType) mapType).getElementType();
+            valueType = new IntType();
+        } else {
+            throw new UnsupportedOperationException("Unsupported type: " + 
mapType);
+        }
+
+        return new ArrayBasedMapData(
+                fromFlinkArrayElementType(map.keyArray(), keyType),
+                fromFlinkArrayElementType(map.valueArray(), valueType));
+    }
+}
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
new file mode 100644
index 00000000..17ae24cb
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+/** A Spark {@link PartitionReaderFactory} for table store. */
+public class SparkReaderFactory implements PartitionReaderFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreTable table;
+    private final int[] projectedFields;
+
+    public SparkReaderFactory(FileStoreTable table, int[] projectedFields) {
+        this.table = table;
+        this.projectedFields = projectedFields;
+    }
+
+    private RowType readRowType() {
+        return TypeUtils.project(table.rowType(), projectedFields);
+    }
+
+    @Override
+    public PartitionReader<InternalRow> createReader(InputPartition partition) 
{
+        Split split = ((SparkInputPartition) partition).split();
+        RecordReader<RowData> reader;
+        try {
+            reader =
+                    table.newRead()
+                            .withProjection(projectedFields)
+                            .createReader(split.partition(), split.bucket(), 
split.files());
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+        RecordReaderIterator<RowData> iterator = new 
RecordReaderIterator<>(reader);
+        SparkInternalRow row = new SparkInternalRow(readRowType());
+        return new PartitionReader<InternalRow>() {
+
+            @Override
+            public boolean next() {
+                if (iterator.hasNext()) {
+                    row.replace(iterator.next());
+                    return true;
+                }
+                return false;
+            }
+
+            @Override
+            public InternalRow get() {
+                return row;
+            }
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    iterator.close();
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+        };
+    }
+}
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
new file mode 100644
index 00000000..2167f970
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.utils.TypeUtils;
+
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.OptionalLong;
+
+/**
+ * A Spark {@link Scan} for table store.
+ *
+ * <p>TODO Introduce a SparkRFScan to implement SupportsRuntimeFiltering.
+ */
+public class SparkScan implements Scan, SupportsReportStatistics {
+
+    protected final FileStoreTable table;
+    private final List<Predicate> predicates;
+    private final int[] projectedFields;
+
+    private List<Split> splits;
+
+    public SparkScan(FileStoreTable table, List<Predicate> predicates, int[] 
projectedFields) {
+        this.table = table;
+        this.predicates = predicates;
+        this.projectedFields = projectedFields;
+    }
+
+    @Override
+    public String description() {
+        // TODO add filters
+        return String.format("tablestore(%s)", table.name());
+    }
+
+    @Override
+    public StructType readSchema() {
+        return 
SparkTypeUtils.fromFlinkRowType(TypeUtils.project(table.rowType(), 
projectedFields));
+    }
+
+    @Override
+    public Batch toBatch() {
+        return new Batch() {
+            @Override
+            public InputPartition[] planInputPartitions() {
+                return splits().stream()
+                        .map(SparkInputPartition::new)
+                        .toArray(InputPartition[]::new);
+            }
+
+            @Override
+            public PartitionReaderFactory createReaderFactory() {
+                return new SparkReaderFactory(table, projectedFields);
+            }
+        };
+    }
+
+    protected List<Split> splits() {
+        if (splits == null) {
+            this.splits = table.newScan().withFilter(predicates).plan().splits;
+        }
+        return splits;
+    }
+
+    @Override
+    public Statistics estimateStatistics() {
+        long rowCount = 0L;
+
+        for (Split split : splits()) {
+            for (DataFileMeta file : split.files()) {
+                rowCount += file.rowCount();
+            }
+        }
+
+        final long numRows = rowCount;
+        final long sizeInBytes = readSchema().defaultSize() * numRows;
+
+        return new Statistics() {
+            @Override
+            public OptionalLong sizeInBytes() {
+                return OptionalLong.of(sizeInBytes);
+            }
+
+            @Override
+            public OptionalLong numRows() {
+                return OptionalLong.of(numRows);
+            }
+        };
+    }
+}
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
new file mode 100644
index 00000000..f26566cc
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.FileStoreTable;
+
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
+import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** A Spark {@link ScanBuilder} for table store. */
+public class SparkScanBuilder
+        implements ScanBuilder, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns {
+
+    private final FileStoreTable table;
+
+    private List<Predicate> predicates = new ArrayList<>();
+    private Filter[] pushedFilters;
+    private int[] projectedFields;
+
+    public SparkScanBuilder(FileStoreTable table) {
+        this.table = table;
+    }
+
+    @Override
+    public Filter[] pushFilters(Filter[] filters) {
+        SparkFilterConverter converter = new 
SparkFilterConverter(table.rowType());
+        List<Predicate> predicates = new ArrayList<>();
+        List<Filter> pushed = new ArrayList<>();
+        for (Filter filter : filters) {
+            try {
+                predicates.add(converter.convert(filter));
+                pushed.add(filter);
+            } catch (UnsupportedOperationException ignore) {
+            }
+        }
+        this.predicates = predicates;
+        this.pushedFilters = pushed.toArray(new Filter[0]);
+        return filters;
+    }
+
+    @Override
+    public Filter[] pushedFilters() {
+        return pushedFilters;
+    }
+
+    @Override
+    public void pruneColumns(StructType requiredSchema) {
+        String[] pruneFields = requiredSchema.fieldNames();
+        List<String> fieldNames = table.rowType().getFieldNames();
+        int[] projected = new int[pruneFields.length];
+        for (int i = 0; i < projected.length; i++) {
+            projected[i] = fieldNames.indexOf(pruneFields[i]);
+        }
+        this.projectedFields = projected;
+    }
+
+    @Override
+    public Scan build() {
+        return new SparkScan(table, predicates, projectedFields);
+    }
+}
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
new file mode 100644
index 00000000..90468627
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.Map;
+
+/** The spark source for table store. */
+public class SparkSource implements DataSourceRegister, TableProvider {
+
+    @Override
+    public String shortName() {
+        // Not use 'table-store' here, the '-' is not allowed in SQL
+        return "tablestore";
+    }
+
+    @Override
+    public StructType inferSchema(CaseInsensitiveStringMap options) {
+        // ignore schema.
+        // getTable will get schema by itself.
+        return null;
+    }
+
+    @Override
+    public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+        // ignore partition.
+        // getTable will get partition by itself.
+        return null;
+    }
+
+    @Override
+    public boolean supportsExternalMetadata() {
+        return true;
+    }
+
+    @Override
+    public Table getTable(
+            StructType schema, Transform[] partitioning, Map<String, String> 
options) {
+        FileStoreTable table = 
FileStoreTableFactory.create(Configuration.fromMap(options));
+        return new SparkTable(table);
+    }
+}
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
new file mode 100644
index 00000000..ed4a1997
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.table.FileStoreTable;
+
+import org.apache.spark.sql.catalog.Table;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** A spark {@link Table} for table store. */
+public class SparkTable implements 
org.apache.spark.sql.connector.catalog.Table, SupportsRead {
+
+    private final FileStoreTable table;
+
+    public SparkTable(FileStoreTable table) {
+        this.table = table;
+    }
+
+    @Override
+    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+        // options is already merged into table
+        return new SparkScanBuilder(table);
+    }
+
+    @Override
+    public String name() {
+        return table.name();
+    }
+
+    @Override
+    public StructType schema() {
+        return SparkTypeUtils.fromFlinkRowType(table.rowType());
+    }
+
+    @Override
+    public Set<TableCapability> capabilities() {
+        Set<TableCapability> capabilities = new HashSet<>();
+        capabilities.add(TableCapability.BATCH_READ);
+        return capabilities;
+    }
+}
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
new file mode 100644
index 00000000..65481c21
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utils for spark {@link DataType}. */
+public class SparkTypeUtils {
+
+    private SparkTypeUtils() {}
+
+    public static StructType fromFlinkRowType(RowType type) {
+        return (StructType) fromFlinkType(type);
+    }
+
+    public static DataType fromFlinkType(LogicalType type) {
+        return type.accept(FlinkToSparkTypeVistor.INSTANCE);
+    }
+
+    private static class FlinkToSparkTypeVistor extends 
LogicalTypeDefaultVisitor<DataType> {
+
+        private static final FlinkToSparkTypeVistor INSTANCE = new 
FlinkToSparkTypeVistor();
+
+        @Override
+        public DataType visit(CharType charType) {
+            return DataTypes.StringType;
+        }
+
+        @Override
+        public DataType visit(VarCharType varCharType) {
+            return DataTypes.StringType;
+        }
+
+        @Override
+        public DataType visit(BooleanType booleanType) {
+            return DataTypes.BooleanType;
+        }
+
+        @Override
+        public DataType visit(BinaryType binaryType) {
+            return DataTypes.BinaryType;
+        }
+
+        @Override
+        public DataType visit(VarBinaryType varBinaryType) {
+            return DataTypes.BinaryType;
+        }
+
+        @Override
+        public DataType visit(DecimalType decimalType) {
+            return DataTypes.createDecimalType(decimalType.getPrecision(), 
decimalType.getScale());
+        }
+
+        @Override
+        public DataType visit(TinyIntType tinyIntType) {
+            return DataTypes.ByteType;
+        }
+
+        @Override
+        public DataType visit(SmallIntType smallIntType) {
+            return DataTypes.ShortType;
+        }
+
+        @Override
+        public DataType visit(IntType intType) {
+            return DataTypes.IntegerType;
+        }
+
+        @Override
+        public DataType visit(BigIntType bigIntType) {
+            return DataTypes.LongType;
+        }
+
+        @Override
+        public DataType visit(FloatType floatType) {
+            return DataTypes.FloatType;
+        }
+
+        @Override
+        public DataType visit(DoubleType doubleType) {
+            return DataTypes.DoubleType;
+        }
+
+        @Override
+        public DataType visit(DateType dateType) {
+            return DataTypes.DateType;
+        }
+
+        @Override
+        public DataType visit(TimestampType timestampType) {
+            return DataTypes.TimestampType;
+        }
+
+        @Override
+        public DataType visit(LocalZonedTimestampType localZonedTimestampType) 
{
+            return DataTypes.TimestampType;
+        }
+
+        @Override
+        public DataType visit(ArrayType arrayType) {
+            LogicalType elementType = arrayType.getElementType();
+            return DataTypes.createArrayType(elementType.accept(this), 
elementType.isNullable());
+        }
+
+        @Override
+        public DataType visit(MultisetType multisetType) {
+            return DataTypes.createMapType(
+                    multisetType.getElementType().accept(this), 
DataTypes.IntegerType, false);
+        }
+
+        @Override
+        public DataType visit(MapType mapType) {
+            return DataTypes.createMapType(
+                    mapType.getKeyType().accept(this),
+                    mapType.getValueType().accept(this),
+                    mapType.getValueType().isNullable());
+        }
+
+        @Override
+        public DataType visit(RowType rowType) {
+            List<StructField> fields =
+                    rowType.getFields().stream()
+                            .map(
+                                    field ->
+                                            DataTypes.createStructField(
+                                                    field.getName(),
+                                                    
field.getType().accept(this),
+                                                    
field.getType().isNullable()))
+                            .collect(Collectors.toList());
+            return DataTypes.createStructType(fields);
+        }
+
+        @Override
+        protected DataType defaultMethod(LogicalType logicalType) {
+            throw new UnsupportedOperationException("Unsupported type: " + 
logicalType);
+        }
+    }
+}
diff --git 
a/flink-table-store-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/flink-table-store-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 00000000..e0626103
--- /dev/null
+++ 
b/flink-table-store-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.spark.SparkSource
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
new file mode 100644
index 00000000..7231752f
--- /dev/null
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/** A simple table test helper to write and commit. */
+public class SimpleTableTestHelper {
+
+    private final TableWrite writer;
+    private final TableCommit commit;
+
+    public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
+        Map<String, String> options = new HashMap<>();
+        // orc is shaded, can not find shaded classes in ide
+        options.put(FileStoreOptions.FILE_FORMAT.key(), "avro");
+        new SchemaManager(path)
+                .commitNewVersion(
+                        new UpdateSchema(
+                                rowType,
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                options,
+                                ""));
+        Configuration conf = Configuration.fromMap(options);
+        conf.setString("path", path.toString());
+        FileStoreTable table = FileStoreTableFactory.create(conf, "user");
+        this.writer = table.newWrite();
+        this.commit = table.newCommit();
+    }
+
+    public void write(RowData row) throws Exception {
+        writer.write(row);
+    }
+
+    public void commit() throws Exception {
+        commit.commit(UUID.randomUUID().toString(), writer.prepareCommit());
+    }
+}
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
new file mode 100644
index 00000000..dead1524
--- /dev/null
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.spark.sql.sources.EqualTo;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.GreaterThanOrEqual;
+import org.apache.spark.sql.sources.IsNotNull;
+import org.apache.spark.sql.sources.IsNull;
+import org.apache.spark.sql.sources.LessThan;
+import org.apache.spark.sql.sources.LessThanOrEqual;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SparkFilterConverter}. */
+public class SparkFilterConverterTest {
+
+    @Test
+    public void testAll() {
+        SparkFilterConverter converter =
+                new SparkFilterConverter(
+                        new RowType(
+                                Collections.singletonList(
+                                        new RowType.RowField("id", new 
IntType()))));
+        String field = "id";
+        IsNull isNull = IsNull.apply(field);
+        Predicate expectedIsNull = PredicateBuilder.isNull(0);
+        Predicate actualIsNull = converter.convert(isNull);
+        assertThat(actualIsNull).isEqualTo(expectedIsNull);
+
+        IsNotNull isNotNull = IsNotNull.apply(field);
+        Predicate expectedIsNotNull = PredicateBuilder.isNotNull(0);
+        Predicate actualIsNotNull = converter.convert(isNotNull);
+        assertThat(actualIsNotNull).isEqualTo(expectedIsNotNull);
+
+        LessThan lt = LessThan.apply(field, 1);
+        Predicate expectedLt = PredicateBuilder.lessThan(0, new Literal(new 
IntType(), 1));
+        Predicate actualLt = converter.convert(lt);
+        assertThat(actualLt).isEqualTo(expectedLt);
+
+        LessThanOrEqual ltEq = LessThanOrEqual.apply(field, 1);
+        Predicate expectedLtEq = PredicateBuilder.lessOrEqual(0, new 
Literal(new IntType(), 1));
+        Predicate actualLtEq = converter.convert(ltEq);
+        assertThat(actualLtEq).isEqualTo(expectedLtEq);
+
+        GreaterThan gt = GreaterThan.apply(field, 1);
+        Predicate expectedGt = PredicateBuilder.greaterThan(0, new Literal(new 
IntType(), 1));
+        Predicate actualGt = converter.convert(gt);
+        assertThat(actualGt).isEqualTo(expectedGt);
+
+        GreaterThanOrEqual gtEq = GreaterThanOrEqual.apply(field, 1);
+        Predicate expectedGtEq = PredicateBuilder.greaterOrEqual(0, new 
Literal(new IntType(), 1));
+        Predicate actualGtEq = converter.convert(gtEq);
+        assertThat(actualGtEq).isEqualTo(expectedGtEq);
+
+        EqualTo eq = EqualTo.apply(field, 1);
+        Predicate expectedEq = PredicateBuilder.equal(0, new Literal(new 
IntType(), 1));
+        Predicate actualEq = converter.convert(eq);
+        assertThat(actualEq).isEqualTo(expectedEq);
+    }
+
+    @Test
+    public void testTimestamp() {
+        SparkFilterConverter converter =
+                new SparkFilterConverter(
+                        new RowType(
+                                Collections.singletonList(
+                                        new RowType.RowField("x", new 
TimestampType()))));
+
+        Timestamp timestamp = Timestamp.valueOf("2018-10-18 00:00:57.907");
+        LocalDateTime localDateTime = 
LocalDateTime.parse("2018-10-18T00:00:57.907");
+        Instant instant = localDateTime.toInstant(ZoneOffset.UTC);
+
+        Predicate instantExpression = converter.convert(GreaterThan.apply("x", 
instant));
+        Predicate timestampExpression = 
converter.convert(GreaterThan.apply("x", timestamp));
+        Predicate rawExpression =
+                PredicateBuilder.greaterThan(
+                        0,
+                        new Literal(
+                                new TimestampType(),
+                                
TimestampData.fromLocalDateTime(localDateTime)));
+
+        assertThat(timestampExpression).isEqualTo(rawExpression);
+        assertThat(instantExpression).isEqualTo(rawExpression);
+    }
+
+    @Test
+    public void testDate() {
+        SparkFilterConverter converter =
+                new SparkFilterConverter(
+                        new RowType(
+                                Collections.singletonList(
+                                        new RowType.RowField("x", new 
DateType()))));
+
+        LocalDate localDate = LocalDate.parse("2018-10-18");
+        Date date = Date.valueOf(localDate);
+        int epochDay = (int) localDate.toEpochDay();
+
+        Predicate localDateExpression = 
converter.convert(GreaterThan.apply("x", localDate));
+        Predicate dateExpression = converter.convert(GreaterThan.apply("x", 
date));
+        Predicate rawExpression =
+                PredicateBuilder.greaterThan(0, new Literal(new DateType(), 
epochDay));
+
+        assertThat(dateExpression).isEqualTo(rawExpression);
+        assertThat(localDateExpression).isEqualTo(rawExpression);
+    }
+}
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
new file mode 100644
index 00000000..f1311dd0
--- /dev/null
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Function1;
+
+import static org.apache.flink.table.store.spark.SparkTypeTest.ALL_TYPES;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SparkInternalRow}. */
+public class SparkInternalRowTest {
+
+    @Test
+    public void test() {
+        Row row =
+                Row.of(
+                        1,
+                        "jingsong",
+                        22.2,
+                        Stream.of(
+                                        new AbstractMap.SimpleEntry<>("key1", 
Row.of(1.2, 2.3)),
+                                        new AbstractMap.SimpleEntry<>("key2", 
Row.of(2.4, 3.5)))
+                                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)),
+                        new String[] {"v1", "v5"},
+                        new Integer[] {10, 30},
+                        "char_v",
+                        "varchar_v",
+                        true,
+                        (byte) 22,
+                        (short) 356,
+                        23567222L,
+                        "binary_v".getBytes(StandardCharsets.UTF_8),
+                        "varbinary_v".getBytes(StandardCharsets.UTF_8),
+                        LocalDateTime.parse("2007-12-03T10:15:30"),
+                        Instant.parse("2007-12-03T10:15:30.00Z"),
+                        LocalDate.parse("2022-05-02"),
+                        BigDecimal.valueOf(0.21),
+                        BigDecimal.valueOf(65782123123.01),
+                        BigDecimal.valueOf(62123123.5),
+                        Stream.of(
+                                        new AbstractMap.SimpleEntry<>("key1", 
5),
+                                        new AbstractMap.SimpleEntry<>("key2", 
2))
+                                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+
+        RowRowConverter flinkConverter =
+                
RowRowConverter.create(TypeConversions.fromLogicalToDataType(ALL_TYPES));
+        flinkConverter.open(Thread.currentThread().getContextClassLoader());
+        RowData rowData = flinkConverter.toInternal(row);
+
+        Function1<Object, Object> sparkConverter =
+                CatalystTypeConverters.createToScalaConverter(
+                        SparkTypeUtils.fromFlinkType(ALL_TYPES));
+        org.apache.spark.sql.Row sparkRow =
+                (org.apache.spark.sql.Row)
+                        sparkConverter.apply(new 
SparkInternalRow(ALL_TYPES).replace(rowData));
+
+        String expected =
+                "{"
+                        + "\"id\":1,"
+                        + "\"name\":\"jingsong\","
+                        + "\"salary\":22.2,"
+                        + 
"\"locations\":{\"key1\":{\"posX\":1.2,\"posY\":2.3},\"key2\":{\"posX\":2.4,\"posY\":3.5}},"
+                        + "\"strArray\":[\"v1\",\"v5\"],"
+                        + "\"intArray\":[10,30],"
+                        + "\"char\":\"char_v\","
+                        + "\"varchar\":\"varchar_v\","
+                        + "\"boolean\":true,"
+                        + "\"tinyint\":22,"
+                        + "\"smallint\":356,"
+                        + "\"bigint\":23567222,"
+                        + "\"varbinary\":\"YmluYXJ5X3Y=\","
+                        + "\"binary\":\"dmFyYmluYXJ5X3Y=\","
+                        + "\"timestampWithoutZone\":\"2007-12-03 10:15:30\","
+                        + "\"timestampWithZone\":\"2007-12-03 10:15:30\","
+                        + "\"date\":\"2022-05-02\","
+                        + "\"decimal\":0.21,"
+                        + "\"decimal2\":65782123123.01,"
+                        + "\"decimal3\":62123123.5,"
+                        + "\"multiset\":{\"key1\":5,\"key2\":2}}";
+        assertThat(sparkRow.json()).isEqualTo(expected);
+    }
+}
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
new file mode 100644
index 00000000..08de5ce5
--- /dev/null
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for spark reader. */
+public class SparkReadITCase {
+
+    private static SparkSession spark = null;
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private Path path;
+
+    private SimpleTableTestHelper testHelper;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        this.path = new Path(tempDir.toUri().toString(), "my_table");
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new RowType.RowField("a", new IntType()),
+                                new RowType.RowField("b", new BigIntType()),
+                                new RowType.RowField("c", new VarCharType())));
+        testHelper = new SimpleTableTestHelper(path, rowType);
+    }
+
+    @BeforeAll
+    public static void startMetastoreAndSpark() {
+        spark = SparkSession.builder().master("local[2]").getOrCreate();
+    }
+
+    @AfterAll
+    public static void stopMetastoreAndSpark() {
+        spark.stop();
+        spark = null;
+    }
+
+    @Test
+    public void testNormal() throws Exception {
+        testHelper.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+        testHelper.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
+        testHelper.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+        testHelper.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, 
StringData.fromString("2")));
+        testHelper.commit();
+
+        Dataset<Row> dataset =
+                spark.read().format("tablestore").option("path", 
path.toString()).load();
+
+        List<Row> results = dataset.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+        results = dataset.select("a", "c").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
+
+        results = dataset.groupBy().sum("b").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[8]]");
+    }
+
+    @Test
+    public void testFilterPushDown() throws Exception {
+        testHelper.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+        testHelper.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
+        testHelper.commit();
+
+        testHelper.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+        testHelper.write(GenericRowData.of(7, 8L, StringData.fromString("4")));
+        testHelper.commit();
+
+        Dataset<Row> dataset =
+                spark.read().format("tablestore").option("path", 
path.toString()).load();
+
+        List<Row> results = dataset.filter("a < 4").select("a", 
"c").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,1], [3,2]]");
+    }
+}
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
new file mode 100644
index 00000000..12c8694a
--- /dev/null
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SparkTypeUtils}. */
+public class SparkTypeTest {
+
+    public static final RowType ALL_TYPES =
+            (RowType)
+                    TableSchema.builder()
+                            .field("id", DataTypes.INT().notNull())
+                            .field("name", DataTypes.STRING()) /* optional by 
default */
+                            .field("salary", DataTypes.DOUBLE().notNull())
+                            .field(
+                                    "locations",
+                                    DataTypes.MAP(
+                                            DataTypes.STRING(),
+                                            DataTypes.ROW(
+                                                    DataTypes.FIELD(
+                                                            "posX",
+                                                            
DataTypes.DOUBLE().notNull(),
+                                                            "X field"),
+                                                    DataTypes.FIELD(
+                                                            "posY",
+                                                            
DataTypes.DOUBLE().notNull(),
+                                                            "Y field"))))
+                            .field("strArray", 
DataTypes.ARRAY(DataTypes.STRING()).nullable())
+                            .field("intArray", 
DataTypes.ARRAY(DataTypes.INT()).nullable())
+                            .field("char", DataTypes.CHAR(10).notNull())
+                            .field("varchar", DataTypes.VARCHAR(10).notNull())
+                            .field("boolean", DataTypes.BOOLEAN().nullable())
+                            .field("tinyint", DataTypes.TINYINT())
+                            .field("smallint", DataTypes.SMALLINT())
+                            .field("bigint", DataTypes.BIGINT())
+                            .field("varbinary", DataTypes.VARBINARY(10))
+                            .field("binary", DataTypes.BINARY(10))
+                            .field("timestampWithoutZone", 
DataTypes.TIMESTAMP())
+                            .field("timestampWithZone", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                            .field("date", DataTypes.DATE())
+                            .field("decimal", DataTypes.DECIMAL(2, 2))
+                            .field("decimal2", DataTypes.DECIMAL(38, 2))
+                            .field("decimal3", DataTypes.DECIMAL(10, 1))
+                            .field("multiset", 
DataTypes.MULTISET(DataTypes.STRING().notNull()))
+                            .build()
+                            .toRowDataType()
+                            .getLogicalType();
+
+    @Test
+    public void testAllTypes() {
+        String nestedRowMapType =
+                "StructField(locations,MapType("
+                        + "StringType,"
+                        + "StructType(StructField(posX,DoubleType,false), 
StructField(posY,DoubleType,false)),true),true)";
+        String expected =
+                "StructType("
+                        + "StructField(id,IntegerType,false), "
+                        + "StructField(name,StringType,true), "
+                        + "StructField(salary,DoubleType,false), "
+                        + nestedRowMapType
+                        + ", "
+                        + 
"StructField(strArray,ArrayType(StringType,true),true), "
+                        + 
"StructField(intArray,ArrayType(IntegerType,true),true), "
+                        + "StructField(char,StringType,false), "
+                        + "StructField(varchar,StringType,false), "
+                        + "StructField(boolean,BooleanType,true), "
+                        + "StructField(tinyint,ByteType,true), "
+                        + "StructField(smallint,ShortType,true), "
+                        + "StructField(bigint,LongType,true), "
+                        + "StructField(varbinary,BinaryType,true), "
+                        + "StructField(binary,BinaryType,true), "
+                        + 
"StructField(timestampWithoutZone,TimestampType,true), "
+                        + "StructField(timestampWithZone,TimestampType,true), "
+                        + "StructField(date,DateType,true), "
+                        + "StructField(decimal,DecimalType(2,2),true), "
+                        + "StructField(decimal2,DecimalType(38,2),true), "
+                        + "StructField(decimal3,DecimalType(10,1),true), "
+                        + 
"StructField(multiset,MapType(StringType,IntegerType,false),true))";
+
+        
assertThat(SparkTypeUtils.fromFlinkRowType(ALL_TYPES).toString()).isEqualTo(expected);
+    }
+}
diff --git a/flink-table-store-spark/src/test/resources/log4j2-test.properties 
b/flink-table-store-spark/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..863665cf
--- /dev/null
+++ b/flink-table-store-spark/src/test/resources/log4j2-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+logger.kafka.name = kafka
+logger.kafka.level = OFF
+logger.kafka2.name = state.change
+logger.kafka2.level = OFF
+
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = OFF
+logger.I0Itec.name = org.I0Itec
+logger.I0Itec.level = OFF
diff --git a/pom.xml b/pom.xml
index 0d0d5082..82eaf00c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@ under the License.
         <module>flink-table-store-format</module>
         <module>flink-table-store-hive</module>
         <module>flink-table-store-kafka</module>
+        <module>flink-table-store-spark</module>
     </modules>
 
     <properties>
@@ -463,7 +464,7 @@ under the License.
                             <rules>
                                 <bannedDependencies>
                                     <excludes>
-                                        
<exclude>com.fasterxml.jackson*:*:(,2.10.0]</exclude>
+                                        
<exclude>com.fasterxml.jackson*:*:(,2.9.0]</exclude>
                                     </excludes>
                                 </bannedDependencies>
                             </rules>

Reply via email to