This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new c58576eb [FLINK-27947] Introduce Spark Reader for table store
c58576eb is described below
commit c58576eb3bd3d860c5ba5a940b4d0e0b3cb5f55a
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 17 16:31:48 2022 +0800
[FLINK-27947] Introduce Spark Reader for table store
This closes #147
---
flink-table-store-spark/pom.xml | 214 ++++++++++++++++++
.../flink/table/store/spark/SparkArrayData.java | 172 +++++++++++++++
.../table/store/spark/SparkFilterConverter.java | 113 ++++++++++
.../table/store/spark/SparkInputPartition.java | 55 +++++
.../flink/table/store/spark/SparkInternalRow.java | 241 +++++++++++++++++++++
.../table/store/spark/SparkReaderFactory.java | 93 ++++++++
.../apache/flink/table/store/spark/SparkScan.java | 117 ++++++++++
.../flink/table/store/spark/SparkScanBuilder.java | 85 ++++++++
.../flink/table/store/spark/SparkSource.java | 68 ++++++
.../apache/flink/table/store/spark/SparkTable.java | 64 ++++++
.../flink/table/store/spark/SparkTypeUtils.java | 182 ++++++++++++++++
...org.apache.spark.sql.sources.DataSourceRegister | 16 ++
.../table/store/spark/SimpleTableTestHelper.java | 70 ++++++
.../store/spark/SparkFilterConverterTest.java | 141 ++++++++++++
.../table/store/spark/SparkInternalRowTest.java | 116 ++++++++++
.../flink/table/store/spark/SparkReadITCase.java | 115 ++++++++++
.../flink/table/store/spark/SparkTypeTest.java | 105 +++++++++
.../src/test/resources/log4j2-test.properties | 38 ++++
pom.xml | 3 +-
19 files changed, 2007 insertions(+), 1 deletion(-)
diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml
new file mode 100644
index 00000000..b2987c23
--- /dev/null
+++ b/flink-table-store-spark/pom.xml
@@ -0,0 +1,214 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-table-store-parent</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.2-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-table-store-spark</artifactId>
+ <name>Flink Table Store : Spark</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <spark.version>3.2.1</spark.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.12</artifactId>
+ <version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-common</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-core</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-codegen-loader</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-format</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ </dependencies>
+
+ <!-- Activate these profiles with -Pspark-x.x to build and test against
different Spark versions -->
+ <profiles>
+ <profile>
+ <id>spark-3.2</id>
+ <properties>
+ <spark.version>3.2.1</spark.version>
+ </properties>
+ </profile>
+ <profile>
+ <id>spark-3.1</id>
+ <properties>
+ <spark.version>3.1.3</spark.version>
+ </properties>
+ </profile>
+ <profile>
+ <id>spark-3.0</id>
+ <properties>
+ <spark.version>3.0.3</spark.version>
+ </properties>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/${spark.version}</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
new file mode 100644
index 00000000..1e18fe30
--- /dev/null
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkArrayData.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.utils.RowDataUtils;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import static org.apache.flink.table.store.spark.SparkInternalRow.fromFlink;
+import static org.apache.flink.table.store.utils.RowDataUtils.copyArray;
+import static org.apache.flink.table.store.utils.TypeUtils.timestampPrecision;
+
+/** Spark {@link ArrayData} to wrap flink {@code ArrayData}. */
+public class SparkArrayData extends ArrayData {
+
+ private final LogicalType elementType;
+
+ private org.apache.flink.table.data.ArrayData array;
+
+ public SparkArrayData(LogicalType elementType) {
+ this.elementType = elementType;
+ }
+
+ public SparkArrayData replace(org.apache.flink.table.data.ArrayData array)
{
+ this.array = array;
+ return this;
+ }
+
+ @Override
+ public int numElements() {
+ return array.size();
+ }
+
+ @Override
+ public ArrayData copy() {
+ return new SparkArrayData(elementType).replace(copyArray(array,
elementType));
+ }
+
+ @Override
+ public Object[] array() {
+ Object[] objects = new Object[numElements()];
+ for (int i = 0; i < objects.length; i++) {
+ objects[i] = fromFlink(RowDataUtils.get(array, i, elementType),
elementType);
+ }
+ return objects;
+ }
+
+ @Override
+ public void setNullAt(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void update(int i, Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isNullAt(int ordinal) {
+ return array.isNullAt(ordinal);
+ }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ return array.getBoolean(ordinal);
+ }
+
+ @Override
+ public byte getByte(int ordinal) {
+ return array.getByte(ordinal);
+ }
+
+ @Override
+ public short getShort(int ordinal) {
+ return array.getShort(ordinal);
+ }
+
+ @Override
+ public int getInt(int ordinal) {
+ return array.getInt(ordinal);
+ }
+
+ @Override
+ public long getLong(int ordinal) {
+ if (elementType instanceof BigIntType) {
+ return array.getLong(ordinal);
+ }
+
+ return getTimestampMicros(ordinal);
+ }
+
+ private long getTimestampMicros(int ordinal) {
+ return fromFlink(array.getTimestamp(ordinal,
timestampPrecision(elementType)));
+ }
+
+ @Override
+ public float getFloat(int ordinal) {
+ return array.getFloat(ordinal);
+ }
+
+ @Override
+ public double getDouble(int ordinal) {
+ return array.getDouble(ordinal);
+ }
+
+ @Override
+ public Decimal getDecimal(int ordinal, int precision, int scale) {
+ return fromFlink(array.getDecimal(ordinal, precision, scale));
+ }
+
+ @Override
+ public UTF8String getUTF8String(int ordinal) {
+ return fromFlink(array.getString(ordinal));
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ return array.getBinary(ordinal);
+ }
+
+ @Override
+ public CalendarInterval getInterval(int ordinal) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InternalRow getStruct(int ordinal, int numFields) {
+ return fromFlink(array.getRow(ordinal, numFields), (RowType)
elementType);
+ }
+
+ @Override
+ public ArrayData getArray(int ordinal) {
+ return fromFlink(array.getArray(ordinal), (ArrayType) elementType);
+ }
+
+ @Override
+ public MapData getMap(int ordinal) {
+ return fromFlink(array.getMap(ordinal), elementType);
+ }
+
+ @Override
+ public Object get(int ordinal, DataType dataType) {
+ return SpecializedGettersReader.read(this, ordinal, dataType, true,
true);
+ }
+}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
new file mode 100644
index 00000000..ec077c2b
--- /dev/null
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.sql.sources.And;
+import org.apache.spark.sql.sources.EqualTo;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.GreaterThanOrEqual;
+import org.apache.spark.sql.sources.IsNotNull;
+import org.apache.spark.sql.sources.IsNull;
+import org.apache.spark.sql.sources.LessThan;
+import org.apache.spark.sql.sources.LessThanOrEqual;
+import org.apache.spark.sql.sources.Not;
+import org.apache.spark.sql.sources.Or;
+import org.apache.spark.sql.sources.StringStartsWith;
+
+/** Conversion from {@link Filter} to {@link Predicate}. */
+public class SparkFilterConverter {
+
+ private final RowType rowType;
+
+ public SparkFilterConverter(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ public Predicate convert(Filter filter) {
+ if (filter instanceof EqualTo) {
+ EqualTo eq = (EqualTo) filter;
+ // TODO deal with isNaN
+ int index = fieldIndex(eq.attribute());
+ Literal literal = convertLiteral(index, eq.value());
+ return PredicateBuilder.equal(index, literal);
+ } else if (filter instanceof GreaterThan) {
+ GreaterThan gt = (GreaterThan) filter;
+ int index = fieldIndex(gt.attribute());
+ Literal literal = convertLiteral(index, gt.value());
+ return PredicateBuilder.greaterThan(index, literal);
+ } else if (filter instanceof GreaterThanOrEqual) {
+ GreaterThanOrEqual gt = (GreaterThanOrEqual) filter;
+ int index = fieldIndex(gt.attribute());
+ Literal literal = convertLiteral(index, gt.value());
+ return PredicateBuilder.greaterOrEqual(index, literal);
+ } else if (filter instanceof LessThan) {
+ LessThan lt = (LessThan) filter;
+ int index = fieldIndex(lt.attribute());
+ Literal literal = convertLiteral(index, lt.value());
+ return PredicateBuilder.lessThan(index, literal);
+ } else if (filter instanceof LessThanOrEqual) {
+ LessThanOrEqual lt = (LessThanOrEqual) filter;
+ int index = fieldIndex(lt.attribute());
+ Literal literal = convertLiteral(index, lt.value());
+ return PredicateBuilder.lessOrEqual(index, literal);
+ } else if (filter instanceof IsNull) {
+ return PredicateBuilder.isNull(fieldIndex(((IsNull)
filter).attribute()));
+ } else if (filter instanceof IsNotNull) {
+ return PredicateBuilder.isNotNull(fieldIndex(((IsNotNull)
filter).attribute()));
+ } else if (filter instanceof And) {
+ And and = (And) filter;
+ return PredicateBuilder.and(convert(and.left()),
convert(and.right()));
+ } else if (filter instanceof Or) {
+ Or or = (Or) filter;
+ return PredicateBuilder.or(convert(or.left()),
convert(or.right()));
+ } else if (filter instanceof Not) {
+ Not not = (Not) filter;
+ return
convert(not.child()).negate().orElseThrow(UnsupportedOperationException::new);
+ } else if (filter instanceof StringStartsWith) {
+ StringStartsWith startsWith = (StringStartsWith) filter;
+ int index = fieldIndex(startsWith.attribute());
+ Literal literal = convertLiteral(index, startsWith.value());
+ return PredicateBuilder.startsWith(index, literal);
+ }
+
+ // TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe
+ throw new UnsupportedOperationException();
+ }
+
+ private int fieldIndex(String field) {
+ int index = rowType.getFieldIndex(field);
+ // TODO: support nested field
+ if (index == -1) {
+ throw new UnsupportedOperationException();
+ }
+ return index;
+ }
+
+ private Literal convertLiteral(int index, Object value) {
+ LogicalType type = rowType.getTypeAt(index);
+ return Literal.fromJavaObject(type, value);
+ }
+}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
new file mode 100644
index 00000000..33e7fa51
--- /dev/null
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.store.table.source.Split;
+
+import org.apache.spark.sql.connector.read.InputPartition;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/** A Spark {@link InputPartition} for table store. */
+public class SparkInputPartition implements InputPartition {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Split split;
+
+ public SparkInputPartition(Split split) {
+ this.split = split;
+ }
+
+ public Split split() {
+ return split;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ split.serialize(new DataOutputViewStreamWrapper(out));
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+ split = Split.deserialize(new DataInputViewStreamWrapper(in));
+ }
+}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
new file mode 100644
index 00000000..2c23e992
--- /dev/null
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInternalRow.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader;
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import static org.apache.flink.table.store.utils.RowDataUtils.copyRowData;
+import static org.apache.flink.table.store.utils.TypeUtils.timestampPrecision;
+
+/** Spark {@link InternalRow} to wrap {@link RowData}. */
+public class SparkInternalRow extends InternalRow {
+
+ private final RowType rowType;
+
+ private RowData row;
+
+ public SparkInternalRow(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ public SparkInternalRow replace(RowData row) {
+ this.row = row;
+ return this;
+ }
+
+ @Override
+ public int numFields() {
+ return row.getArity();
+ }
+
+ @Override
+ public void setNullAt(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void update(int i, Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InternalRow copy() {
+ return new SparkInternalRow(rowType).replace(copyRowData(row,
rowType));
+ }
+
+ @Override
+ public boolean isNullAt(int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ return row.getBoolean(ordinal);
+ }
+
+ @Override
+ public byte getByte(int ordinal) {
+ return row.getByte(ordinal);
+ }
+
+ @Override
+ public short getShort(int ordinal) {
+ return row.getShort(ordinal);
+ }
+
+ @Override
+ public int getInt(int ordinal) {
+ return row.getInt(ordinal);
+ }
+
+ @Override
+ public long getLong(int ordinal) {
+ if (rowType.getTypeAt(ordinal) instanceof BigIntType) {
+ return row.getLong(ordinal);
+ }
+
+ return getTimestampMicros(ordinal);
+ }
+
+ private long getTimestampMicros(int ordinal) {
+ LogicalType type = rowType.getTypeAt(ordinal);
+ return fromFlink(row.getTimestamp(ordinal, timestampPrecision(type)));
+ }
+
+ @Override
+ public float getFloat(int ordinal) {
+ return row.getFloat(ordinal);
+ }
+
+ @Override
+ public double getDouble(int ordinal) {
+ return row.getDouble(ordinal);
+ }
+
+ @Override
+ public Decimal getDecimal(int ordinal, int precision, int scale) {
+ DecimalData decimal = row.getDecimal(ordinal, precision, scale);
+ return fromFlink(decimal);
+ }
+
+ @Override
+ public UTF8String getUTF8String(int ordinal) {
+ return fromFlink(row.getString(ordinal));
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ return row.getBinary(ordinal);
+ }
+
+ @Override
+ public CalendarInterval getInterval(int ordinal) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InternalRow getStruct(int ordinal, int numFields) {
+ return fromFlink(row.getRow(ordinal, numFields), (RowType)
rowType.getTypeAt(ordinal));
+ }
+
+ @Override
+ public ArrayData getArray(int ordinal) {
+ return fromFlink(row.getArray(ordinal), (ArrayType)
rowType.getTypeAt(ordinal));
+ }
+
+ @Override
+ public MapData getMap(int ordinal) {
+ return fromFlink(row.getMap(ordinal), rowType.getTypeAt(ordinal));
+ }
+
+ @Override
+ public Object get(int ordinal, DataType dataType) {
+ return SpecializedGettersReader.read(this, ordinal, dataType, true,
true);
+ }
+
+ public static Object fromFlink(Object o, LogicalType type) {
+ if (o == null) {
+ return null;
+ }
+ switch (type.getTypeRoot()) {
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return fromFlink((TimestampData) o);
+ case CHAR:
+ case VARCHAR:
+ return fromFlink((StringData) o);
+ case DECIMAL:
+ return fromFlink((DecimalData) o);
+ case ARRAY:
+ return fromFlink((org.apache.flink.table.data.ArrayData) o,
(ArrayType) type);
+ case MAP:
+ case MULTISET:
+ return fromFlink((org.apache.flink.table.data.MapData) o,
type);
+ case ROW:
+ return fromFlink((RowData) o, (RowType) type);
+ default:
+ return o;
+ }
+ }
+
+ public static UTF8String fromFlink(StringData string) {
+ return UTF8String.fromBytes(string.toBytes());
+ }
+
+ public static Decimal fromFlink(DecimalData decimal) {
+ return Decimal.apply(decimal.toBigDecimal());
+ }
+
+ public static InternalRow fromFlink(RowData row, RowType rowType) {
+ return new SparkInternalRow(rowType).replace(row);
+ }
+
+ public static long fromFlink(TimestampData timestamp) {
+ return DateTimeUtils.fromJavaTimestamp(timestamp.toTimestamp());
+ }
+
+ public static ArrayData fromFlink(
+ org.apache.flink.table.data.ArrayData array, ArrayType arrayType) {
+ return fromFlinkArrayElementType(array, arrayType.getElementType());
+ }
+
+ private static ArrayData fromFlinkArrayElementType(
+ org.apache.flink.table.data.ArrayData array, LogicalType
elementType) {
+ return new SparkArrayData(elementType).replace(array);
+ }
+
+ public static MapData fromFlink(org.apache.flink.table.data.MapData map,
LogicalType mapType) {
+ LogicalType keyType;
+ LogicalType valueType;
+ if (mapType instanceof MapType) {
+ keyType = ((MapType) mapType).getKeyType();
+ valueType = ((MapType) mapType).getValueType();
+ } else if (mapType instanceof MultisetType) {
+ keyType = ((MultisetType) mapType).getElementType();
+ valueType = new IntType();
+ } else {
+ throw new UnsupportedOperationException("Unsupported type: " +
mapType);
+ }
+
+ return new ArrayBasedMapData(
+ fromFlinkArrayElementType(map.keyArray(), keyType),
+ fromFlinkArrayElementType(map.valueArray(), valueType));
+ }
+}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
new file mode 100644
index 00000000..17ae24cb
--- /dev/null
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+/** A Spark {@link PartitionReaderFactory} for table store. */
+public class SparkReaderFactory implements PartitionReaderFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreTable table;
+ private final int[] projectedFields;
+
+ public SparkReaderFactory(FileStoreTable table, int[] projectedFields) {
+ this.table = table;
+ this.projectedFields = projectedFields;
+ }
+
+ private RowType readRowType() {
+ return TypeUtils.project(table.rowType(), projectedFields);
+ }
+
+ @Override
+ public PartitionReader<InternalRow> createReader(InputPartition partition)
{
+ Split split = ((SparkInputPartition) partition).split();
+ RecordReader<RowData> reader;
+ try {
+ reader =
+ table.newRead()
+ .withProjection(projectedFields)
+ .createReader(split.partition(), split.bucket(),
split.files());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ RecordReaderIterator<RowData> iterator = new
RecordReaderIterator<>(reader);
+ SparkInternalRow row = new SparkInternalRow(readRowType());
+ return new PartitionReader<InternalRow>() {
+
+ @Override
+ public boolean next() {
+ if (iterator.hasNext()) {
+ row.replace(iterator.next());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public InternalRow get() {
+ return row;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ iterator.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ };
+ }
+}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
new file mode 100644
index 00000000..2167f970
--- /dev/null
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.utils.TypeUtils;
+
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.OptionalLong;
+
+/**
+ * A Spark {@link Scan} for table store.
+ *
+ * <p>TODO Introduce a SparkRFScan to implement SupportsRuntimeFiltering.
+ */
+public class SparkScan implements Scan, SupportsReportStatistics {
+
+ protected final FileStoreTable table;
+ private final List<Predicate> predicates;
+ private final int[] projectedFields;
+
+ private List<Split> splits;
+
+ public SparkScan(FileStoreTable table, List<Predicate> predicates, int[]
projectedFields) {
+ this.table = table;
+ this.predicates = predicates;
+ this.projectedFields = projectedFields;
+ }
+
+ @Override
+ public String description() {
+ // TODO add filters
+ return String.format("tablestore(%s)", table.name());
+ }
+
+ @Override
+ public StructType readSchema() {
+ return
SparkTypeUtils.fromFlinkRowType(TypeUtils.project(table.rowType(),
projectedFields));
+ }
+
+ @Override
+ public Batch toBatch() {
+ return new Batch() {
+ @Override
+ public InputPartition[] planInputPartitions() {
+ return splits().stream()
+ .map(SparkInputPartition::new)
+ .toArray(InputPartition[]::new);
+ }
+
+ @Override
+ public PartitionReaderFactory createReaderFactory() {
+ return new SparkReaderFactory(table, projectedFields);
+ }
+ };
+ }
+
+ protected List<Split> splits() {
+ if (splits == null) {
+ this.splits = table.newScan().withFilter(predicates).plan().splits;
+ }
+ return splits;
+ }
+
+ @Override
+ public Statistics estimateStatistics() {
+ long rowCount = 0L;
+
+ for (Split split : splits()) {
+ for (DataFileMeta file : split.files()) {
+ rowCount += file.rowCount();
+ }
+ }
+
+ final long numRows = rowCount;
+ final long sizeInBytes = readSchema().defaultSize() * numRows;
+
+ return new Statistics() {
+ @Override
+ public OptionalLong sizeInBytes() {
+ return OptionalLong.of(sizeInBytes);
+ }
+
+ @Override
+ public OptionalLong numRows() {
+ return OptionalLong.of(numRows);
+ }
+ };
+ }
+}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
new file mode 100644
index 00000000..f26566cc
--- /dev/null
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.FileStoreTable;
+
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
+import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** A Spark {@link ScanBuilder} for table store. */
+public class SparkScanBuilder
+ implements ScanBuilder, SupportsPushDownFilters,
SupportsPushDownRequiredColumns {
+
+ private final FileStoreTable table;
+
+ private List<Predicate> predicates = new ArrayList<>();
+ private Filter[] pushedFilters;
+ private int[] projectedFields;
+
+ public SparkScanBuilder(FileStoreTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public Filter[] pushFilters(Filter[] filters) {
+ SparkFilterConverter converter = new
SparkFilterConverter(table.rowType());
+ List<Predicate> predicates = new ArrayList<>();
+ List<Filter> pushed = new ArrayList<>();
+ for (Filter filter : filters) {
+ try {
+ predicates.add(converter.convert(filter));
+ pushed.add(filter);
+ } catch (UnsupportedOperationException ignore) {
+ }
+ }
+ this.predicates = predicates;
+ this.pushedFilters = pushed.toArray(new Filter[0]);
+ return filters;
+ }
+
+ @Override
+ public Filter[] pushedFilters() {
+ return pushedFilters;
+ }
+
+ @Override
+ public void pruneColumns(StructType requiredSchema) {
+ String[] pruneFields = requiredSchema.fieldNames();
+ List<String> fieldNames = table.rowType().getFieldNames();
+ int[] projected = new int[pruneFields.length];
+ for (int i = 0; i < projected.length; i++) {
+ projected[i] = fieldNames.indexOf(pruneFields[i]);
+ }
+ this.projectedFields = projected;
+ }
+
+ @Override
+ public Scan build() {
+ return new SparkScan(table, predicates, projectedFields);
+ }
+}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
new file mode 100644
index 00000000..90468627
--- /dev/null
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.Map;
+
+/** The spark source for table store. */
+public class SparkSource implements DataSourceRegister, TableProvider {
+
+ @Override
+ public String shortName() {
+ // Not use 'table-store' here, the '-' is not allowed in SQL
+ return "tablestore";
+ }
+
+ @Override
+ public StructType inferSchema(CaseInsensitiveStringMap options) {
+ // ignore schema.
+ // getTable will get schema by itself.
+ return null;
+ }
+
+ @Override
+ public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+ // ignore partition.
+ // getTable will get partition by itself.
+ return null;
+ }
+
+ @Override
+ public boolean supportsExternalMetadata() {
+ return true;
+ }
+
+ @Override
+ public Table getTable(
+ StructType schema, Transform[] partitioning, Map<String, String>
options) {
+ FileStoreTable table =
FileStoreTableFactory.create(Configuration.fromMap(options));
+ return new SparkTable(table);
+ }
+}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
new file mode 100644
index 00000000..ed4a1997
--- /dev/null
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.table.FileStoreTable;
+
+import org.apache.spark.sql.catalog.Table;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** A spark {@link Table} for table store. */
+public class SparkTable implements
org.apache.spark.sql.connector.catalog.Table, SupportsRead {
+
+ private final FileStoreTable table;
+
+ public SparkTable(FileStoreTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ // options is already merged into table
+ return new SparkScanBuilder(table);
+ }
+
+ @Override
+ public String name() {
+ return table.name();
+ }
+
+ @Override
+ public StructType schema() {
+ return SparkTypeUtils.fromFlinkRowType(table.rowType());
+ }
+
+ @Override
+ public Set<TableCapability> capabilities() {
+ Set<TableCapability> capabilities = new HashSet<>();
+ capabilities.add(TableCapability.BATCH_READ);
+ return capabilities;
+ }
+}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
new file mode 100644
index 00000000..65481c21
--- /dev/null
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utils for spark {@link DataType}. */
+public class SparkTypeUtils {
+
+ private SparkTypeUtils() {}
+
+ public static StructType fromFlinkRowType(RowType type) {
+ return (StructType) fromFlinkType(type);
+ }
+
+ public static DataType fromFlinkType(LogicalType type) {
+ return type.accept(FlinkToSparkTypeVistor.INSTANCE);
+ }
+
+ private static class FlinkToSparkTypeVistor extends
LogicalTypeDefaultVisitor<DataType> {
+
+ private static final FlinkToSparkTypeVistor INSTANCE = new
FlinkToSparkTypeVistor();
+
+ @Override
+ public DataType visit(CharType charType) {
+ return DataTypes.StringType;
+ }
+
+ @Override
+ public DataType visit(VarCharType varCharType) {
+ return DataTypes.StringType;
+ }
+
+ @Override
+ public DataType visit(BooleanType booleanType) {
+ return DataTypes.BooleanType;
+ }
+
+ @Override
+ public DataType visit(BinaryType binaryType) {
+ return DataTypes.BinaryType;
+ }
+
+ @Override
+ public DataType visit(VarBinaryType varBinaryType) {
+ return DataTypes.BinaryType;
+ }
+
+ @Override
+ public DataType visit(DecimalType decimalType) {
+ return DataTypes.createDecimalType(decimalType.getPrecision(),
decimalType.getScale());
+ }
+
+ @Override
+ public DataType visit(TinyIntType tinyIntType) {
+ return DataTypes.ByteType;
+ }
+
+ @Override
+ public DataType visit(SmallIntType smallIntType) {
+ return DataTypes.ShortType;
+ }
+
+ @Override
+ public DataType visit(IntType intType) {
+ return DataTypes.IntegerType;
+ }
+
+ @Override
+ public DataType visit(BigIntType bigIntType) {
+ return DataTypes.LongType;
+ }
+
+ @Override
+ public DataType visit(FloatType floatType) {
+ return DataTypes.FloatType;
+ }
+
+ @Override
+ public DataType visit(DoubleType doubleType) {
+ return DataTypes.DoubleType;
+ }
+
+ @Override
+ public DataType visit(DateType dateType) {
+ return DataTypes.DateType;
+ }
+
+ @Override
+ public DataType visit(TimestampType timestampType) {
+ return DataTypes.TimestampType;
+ }
+
+ @Override
+ public DataType visit(LocalZonedTimestampType localZonedTimestampType)
{
+ return DataTypes.TimestampType;
+ }
+
+ @Override
+ public DataType visit(ArrayType arrayType) {
+ LogicalType elementType = arrayType.getElementType();
+ return DataTypes.createArrayType(elementType.accept(this),
elementType.isNullable());
+ }
+
+ @Override
+ public DataType visit(MultisetType multisetType) {
+ return DataTypes.createMapType(
+ multisetType.getElementType().accept(this),
DataTypes.IntegerType, false);
+ }
+
+ @Override
+ public DataType visit(MapType mapType) {
+ return DataTypes.createMapType(
+ mapType.getKeyType().accept(this),
+ mapType.getValueType().accept(this),
+ mapType.getValueType().isNullable());
+ }
+
+ @Override
+ public DataType visit(RowType rowType) {
+ List<StructField> fields =
+ rowType.getFields().stream()
+ .map(
+ field ->
+ DataTypes.createStructField(
+ field.getName(),
+
field.getType().accept(this),
+
field.getType().isNullable()))
+ .collect(Collectors.toList());
+ return DataTypes.createStructType(fields);
+ }
+
+ @Override
+ protected DataType defaultMethod(LogicalType logicalType) {
+ throw new UnsupportedOperationException("Unsupported type: " +
logicalType);
+ }
+ }
+}
diff --git
a/flink-table-store-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/flink-table-store-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 00000000..e0626103
--- /dev/null
+++
b/flink-table-store-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.spark.SparkSource
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
new file mode 100644
index 00000000..7231752f
--- /dev/null
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/** A simple table test helper to write and commit. */
+public class SimpleTableTestHelper {
+
+ private final TableWrite writer;
+ private final TableCommit commit;
+
+ public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
+ Map<String, String> options = new HashMap<>();
+ // orc is shaded, can not find shaded classes in ide
+ options.put(FileStoreOptions.FILE_FORMAT.key(), "avro");
+ new SchemaManager(path)
+ .commitNewVersion(
+ new UpdateSchema(
+ rowType,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ ""));
+ Configuration conf = Configuration.fromMap(options);
+ conf.setString("path", path.toString());
+ FileStoreTable table = FileStoreTableFactory.create(conf, "user");
+ this.writer = table.newWrite();
+ this.commit = table.newCommit();
+ }
+
+ public void write(RowData row) throws Exception {
+ writer.write(row);
+ }
+
+ public void commit() throws Exception {
+ commit.commit(UUID.randomUUID().toString(), writer.prepareCommit());
+ }
+}
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
new file mode 100644
index 00000000..dead1524
--- /dev/null
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.spark.sql.sources.EqualTo;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.GreaterThanOrEqual;
+import org.apache.spark.sql.sources.IsNotNull;
+import org.apache.spark.sql.sources.IsNull;
+import org.apache.spark.sql.sources.LessThan;
+import org.apache.spark.sql.sources.LessThanOrEqual;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SparkFilterConverter}. */
+public class SparkFilterConverterTest {
+
+ @Test
+ public void testAll() {
+ SparkFilterConverter converter =
+ new SparkFilterConverter(
+ new RowType(
+ Collections.singletonList(
+ new RowType.RowField("id", new
IntType()))));
+ String field = "id";
+ IsNull isNull = IsNull.apply(field);
+ Predicate expectedIsNull = PredicateBuilder.isNull(0);
+ Predicate actualIsNull = converter.convert(isNull);
+ assertThat(actualIsNull).isEqualTo(expectedIsNull);
+
+ IsNotNull isNotNull = IsNotNull.apply(field);
+ Predicate expectedIsNotNull = PredicateBuilder.isNotNull(0);
+ Predicate actualIsNotNull = converter.convert(isNotNull);
+ assertThat(actualIsNotNull).isEqualTo(expectedIsNotNull);
+
+ LessThan lt = LessThan.apply(field, 1);
+ Predicate expectedLt = PredicateBuilder.lessThan(0, new Literal(new
IntType(), 1));
+ Predicate actualLt = converter.convert(lt);
+ assertThat(actualLt).isEqualTo(expectedLt);
+
+ LessThanOrEqual ltEq = LessThanOrEqual.apply(field, 1);
+ Predicate expectedLtEq = PredicateBuilder.lessOrEqual(0, new
Literal(new IntType(), 1));
+ Predicate actualLtEq = converter.convert(ltEq);
+ assertThat(actualLtEq).isEqualTo(expectedLtEq);
+
+ GreaterThan gt = GreaterThan.apply(field, 1);
+ Predicate expectedGt = PredicateBuilder.greaterThan(0, new Literal(new
IntType(), 1));
+ Predicate actualGt = converter.convert(gt);
+ assertThat(actualGt).isEqualTo(expectedGt);
+
+ GreaterThanOrEqual gtEq = GreaterThanOrEqual.apply(field, 1);
+ Predicate expectedGtEq = PredicateBuilder.greaterOrEqual(0, new
Literal(new IntType(), 1));
+ Predicate actualGtEq = converter.convert(gtEq);
+ assertThat(actualGtEq).isEqualTo(expectedGtEq);
+
+ EqualTo eq = EqualTo.apply(field, 1);
+ Predicate expectedEq = PredicateBuilder.equal(0, new Literal(new
IntType(), 1));
+ Predicate actualEq = converter.convert(eq);
+ assertThat(actualEq).isEqualTo(expectedEq);
+ }
+
+ @Test
+ public void testTimestamp() {
+ SparkFilterConverter converter =
+ new SparkFilterConverter(
+ new RowType(
+ Collections.singletonList(
+ new RowType.RowField("x", new
TimestampType()))));
+
+ Timestamp timestamp = Timestamp.valueOf("2018-10-18 00:00:57.907");
+ LocalDateTime localDateTime =
LocalDateTime.parse("2018-10-18T00:00:57.907");
+ Instant instant = localDateTime.toInstant(ZoneOffset.UTC);
+
+ Predicate instantExpression = converter.convert(GreaterThan.apply("x",
instant));
+ Predicate timestampExpression =
converter.convert(GreaterThan.apply("x", timestamp));
+ Predicate rawExpression =
+ PredicateBuilder.greaterThan(
+ 0,
+ new Literal(
+ new TimestampType(),
+
TimestampData.fromLocalDateTime(localDateTime)));
+
+ assertThat(timestampExpression).isEqualTo(rawExpression);
+ assertThat(instantExpression).isEqualTo(rawExpression);
+ }
+
+ @Test
+ public void testDate() {
+ SparkFilterConverter converter =
+ new SparkFilterConverter(
+ new RowType(
+ Collections.singletonList(
+ new RowType.RowField("x", new
DateType()))));
+
+ LocalDate localDate = LocalDate.parse("2018-10-18");
+ Date date = Date.valueOf(localDate);
+ int epochDay = (int) localDate.toEpochDay();
+
+ Predicate localDateExpression =
converter.convert(GreaterThan.apply("x", localDate));
+ Predicate dateExpression = converter.convert(GreaterThan.apply("x",
date));
+ Predicate rawExpression =
+ PredicateBuilder.greaterThan(0, new Literal(new DateType(),
epochDay));
+
+ assertThat(dateExpression).isEqualTo(rawExpression);
+ assertThat(localDateExpression).isEqualTo(rawExpression);
+ }
+}
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
new file mode 100644
index 00000000..f1311dd0
--- /dev/null
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Function1;
+
+import static org.apache.flink.table.store.spark.SparkTypeTest.ALL_TYPES;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SparkInternalRow}. */
+public class SparkInternalRowTest {
+
+ @Test
+ public void test() {
+ Row row =
+ Row.of(
+ 1,
+ "jingsong",
+ 22.2,
+ Stream.of(
+ new AbstractMap.SimpleEntry<>("key1",
Row.of(1.2, 2.3)),
+ new AbstractMap.SimpleEntry<>("key2",
Row.of(2.4, 3.5)))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)),
+ new String[] {"v1", "v5"},
+ new Integer[] {10, 30},
+ "char_v",
+ "varchar_v",
+ true,
+ (byte) 22,
+ (short) 356,
+ 23567222L,
+ "binary_v".getBytes(StandardCharsets.UTF_8),
+ "varbinary_v".getBytes(StandardCharsets.UTF_8),
+ LocalDateTime.parse("2007-12-03T10:15:30"),
+ Instant.parse("2007-12-03T10:15:30.00Z"),
+ LocalDate.parse("2022-05-02"),
+ BigDecimal.valueOf(0.21),
+ BigDecimal.valueOf(65782123123.01),
+ BigDecimal.valueOf(62123123.5),
+ Stream.of(
+ new AbstractMap.SimpleEntry<>("key1",
5),
+ new AbstractMap.SimpleEntry<>("key2",
2))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
+
+ RowRowConverter flinkConverter =
+
RowRowConverter.create(TypeConversions.fromLogicalToDataType(ALL_TYPES));
+ flinkConverter.open(Thread.currentThread().getContextClassLoader());
+ RowData rowData = flinkConverter.toInternal(row);
+
+ Function1<Object, Object> sparkConverter =
+ CatalystTypeConverters.createToScalaConverter(
+ SparkTypeUtils.fromFlinkType(ALL_TYPES));
+ org.apache.spark.sql.Row sparkRow =
+ (org.apache.spark.sql.Row)
+ sparkConverter.apply(new
SparkInternalRow(ALL_TYPES).replace(rowData));
+
+ String expected =
+ "{"
+ + "\"id\":1,"
+ + "\"name\":\"jingsong\","
+ + "\"salary\":22.2,"
+ +
"\"locations\":{\"key1\":{\"posX\":1.2,\"posY\":2.3},\"key2\":{\"posX\":2.4,\"posY\":3.5}},"
+ + "\"strArray\":[\"v1\",\"v5\"],"
+ + "\"intArray\":[10,30],"
+ + "\"char\":\"char_v\","
+ + "\"varchar\":\"varchar_v\","
+ + "\"boolean\":true,"
+ + "\"tinyint\":22,"
+ + "\"smallint\":356,"
+ + "\"bigint\":23567222,"
+ + "\"varbinary\":\"YmluYXJ5X3Y=\","
+ + "\"binary\":\"dmFyYmluYXJ5X3Y=\","
+ + "\"timestampWithoutZone\":\"2007-12-03 10:15:30\","
+ + "\"timestampWithZone\":\"2007-12-03 10:15:30\","
+ + "\"date\":\"2022-05-02\","
+ + "\"decimal\":0.21,"
+ + "\"decimal2\":65782123123.01,"
+ + "\"decimal3\":62123123.5,"
+ + "\"multiset\":{\"key1\":5,\"key2\":2}}";
+ assertThat(sparkRow.json()).isEqualTo(expected);
+ }
+}
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
new file mode 100644
index 00000000..08de5ce5
--- /dev/null
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for spark reader. */
+public class SparkReadITCase {
+
+ private static SparkSession spark = null;
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private Path path;
+
+ private SimpleTableTestHelper testHelper;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ this.path = new Path(tempDir.toUri().toString(), "my_table");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("a", new IntType()),
+ new RowType.RowField("b", new BigIntType()),
+ new RowType.RowField("c", new VarCharType())));
+ testHelper = new SimpleTableTestHelper(path, rowType);
+ }
+
+ @BeforeAll
+ public static void startMetastoreAndSpark() {
+ spark = SparkSession.builder().master("local[2]").getOrCreate();
+ }
+
+ @AfterAll
+ public static void stopMetastoreAndSpark() {
+ spark.stop();
+ spark = null;
+ }
+
+ @Test
+ public void testNormal() throws Exception {
+ testHelper.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+ testHelper.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
+ testHelper.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+ testHelper.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L,
StringData.fromString("2")));
+ testHelper.commit();
+
+ Dataset<Row> dataset =
+ spark.read().format("tablestore").option("path",
path.toString()).load();
+
+ List<Row> results = dataset.collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+ results = dataset.select("a", "c").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
+
+ results = dataset.groupBy().sum("b").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[8]]");
+ }
+
+ @Test
+ public void testFilterPushDown() throws Exception {
+ testHelper.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+ testHelper.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
+ testHelper.commit();
+
+ testHelper.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+ testHelper.write(GenericRowData.of(7, 8L, StringData.fromString("4")));
+ testHelper.commit();
+
+ Dataset<Row> dataset =
+ spark.read().format("tablestore").option("path",
path.toString()).load();
+
+ List<Row> results = dataset.filter("a < 4").select("a",
"c").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,1], [3,2]]");
+ }
+}
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
new file mode 100644
index 00000000..12c8694a
--- /dev/null
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SparkTypeUtils}. */
+public class SparkTypeTest {
+
+ public static final RowType ALL_TYPES =
+ (RowType)
+ TableSchema.builder()
+ .field("id", DataTypes.INT().notNull())
+ .field("name", DataTypes.STRING()) /* optional by
default */
+ .field("salary", DataTypes.DOUBLE().notNull())
+ .field(
+ "locations",
+ DataTypes.MAP(
+ DataTypes.STRING(),
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "posX",
+
DataTypes.DOUBLE().notNull(),
+ "X field"),
+ DataTypes.FIELD(
+ "posY",
+
DataTypes.DOUBLE().notNull(),
+ "Y field"))))
+ .field("strArray",
DataTypes.ARRAY(DataTypes.STRING()).nullable())
+ .field("intArray",
DataTypes.ARRAY(DataTypes.INT()).nullable())
+ .field("char", DataTypes.CHAR(10).notNull())
+ .field("varchar", DataTypes.VARCHAR(10).notNull())
+ .field("boolean", DataTypes.BOOLEAN().nullable())
+ .field("tinyint", DataTypes.TINYINT())
+ .field("smallint", DataTypes.SMALLINT())
+ .field("bigint", DataTypes.BIGINT())
+ .field("varbinary", DataTypes.VARBINARY(10))
+ .field("binary", DataTypes.BINARY(10))
+ .field("timestampWithoutZone",
DataTypes.TIMESTAMP())
+ .field("timestampWithZone",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .field("date", DataTypes.DATE())
+ .field("decimal", DataTypes.DECIMAL(2, 2))
+ .field("decimal2", DataTypes.DECIMAL(38, 2))
+ .field("decimal3", DataTypes.DECIMAL(10, 1))
+ .field("multiset",
DataTypes.MULTISET(DataTypes.STRING().notNull()))
+ .build()
+ .toRowDataType()
+ .getLogicalType();
+
+ @Test
+ public void testAllTypes() {
+ String nestedRowMapType =
+ "StructField(locations,MapType("
+ + "StringType,"
+ + "StructType(StructField(posX,DoubleType,false),
StructField(posY,DoubleType,false)),true),true)";
+ String expected =
+ "StructType("
+ + "StructField(id,IntegerType,false), "
+ + "StructField(name,StringType,true), "
+ + "StructField(salary,DoubleType,false), "
+ + nestedRowMapType
+ + ", "
+ +
"StructField(strArray,ArrayType(StringType,true),true), "
+ +
"StructField(intArray,ArrayType(IntegerType,true),true), "
+ + "StructField(char,StringType,false), "
+ + "StructField(varchar,StringType,false), "
+ + "StructField(boolean,BooleanType,true), "
+ + "StructField(tinyint,ByteType,true), "
+ + "StructField(smallint,ShortType,true), "
+ + "StructField(bigint,LongType,true), "
+ + "StructField(varbinary,BinaryType,true), "
+ + "StructField(binary,BinaryType,true), "
+ +
"StructField(timestampWithoutZone,TimestampType,true), "
+ + "StructField(timestampWithZone,TimestampType,true), "
+ + "StructField(date,DateType,true), "
+ + "StructField(decimal,DecimalType(2,2),true), "
+ + "StructField(decimal2,DecimalType(38,2),true), "
+ + "StructField(decimal3,DecimalType(10,1),true), "
+ +
"StructField(multiset,MapType(StringType,IntegerType,false),true))";
+
+
assertThat(SparkTypeUtils.fromFlinkRowType(ALL_TYPES).toString()).isEqualTo(expected);
+ }
+}
diff --git a/flink-table-store-spark/src/test/resources/log4j2-test.properties
b/flink-table-store-spark/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..863665cf
--- /dev/null
+++ b/flink-table-store-spark/src/test/resources/log4j2-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+logger.kafka.name = kafka
+logger.kafka.level = OFF
+logger.kafka2.name = state.change
+logger.kafka2.level = OFF
+
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = OFF
+logger.I0Itec.name = org.I0Itec
+logger.I0Itec.level = OFF
diff --git a/pom.xml b/pom.xml
index 0d0d5082..82eaf00c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@ under the License.
<module>flink-table-store-format</module>
<module>flink-table-store-hive</module>
<module>flink-table-store-kafka</module>
+ <module>flink-table-store-spark</module>
</modules>
<properties>
@@ -463,7 +464,7 @@ under the License.
<rules>
<bannedDependencies>
<excludes>
-
<exclude>com.fasterxml.jackson*:*:(,2.10.0]</exclude>
+
<exclude>com.fasterxml.jackson*:*:(,2.9.0]</exclude>
</excludes>
</bannedDependencies>
</rules>