JingsongLi commented on code in PR #213: URL: https://github.com/apache/flink-table-store/pull/213#discussion_r920695866
########## flink-table-store-spark2/pom.xml: ########## @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-table-store-parent</artifactId> + <groupId>org.apache.flink</groupId> + <version>0.2-SNAPSHOT</version> + </parent> + + <artifactId>flink-table-store-spark2</artifactId> + <name>Flink Table Store : Spark2</name> + + <packaging>jar</packaging> + + <properties> + <spark.version>2.4.8</spark.version> Review Comment: minor: `spark2.version` ########## flink-table-store-spark2/pom.xml: ########## @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-table-store-parent</artifactId> + <groupId>org.apache.flink</groupId> + <version>0.2-SNAPSHOT</version> + </parent> + + <artifactId>flink-table-store-spark2</artifactId> + <name>Flink Table Store : Spark2</name> + + <packaging>jar</packaging> + + <properties> + <spark.version>2.4.8</spark.version> + </properties> + + <dependencies> + <!-- Flink All dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-shade</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> Review Comment: We don't need to bundle hive catalog. Spark2 has no catalog. ########## flink-table-store-spark2/pom.xml: ########## @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-table-store-parent</artifactId> + <groupId>org.apache.flink</groupId> + <version>0.2-SNAPSHOT</version> + </parent> + + <artifactId>flink-table-store-spark2</artifactId> + <name>Flink Table Store : Spark2</name> + + <packaging>jar</packaging> + + <properties> + <spark.version>2.4.8</spark.version> + </properties> + + <dependencies> + <!-- Flink All dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-shade</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-hive-catalog</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.12</artifactId> + <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <!-- Activate these profiles with -Pspark-x.x to build and test against different Spark versions --> Review Comment: Remove this, this is for multiple spark versions. ########## flink-table-store-spark2/pom.xml: ########## @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-table-store-parent</artifactId> + <groupId>org.apache.flink</groupId> + <version>0.2-SNAPSHOT</version> + </parent> + + <artifactId>flink-table-store-spark2</artifactId> + <name>Flink Table Store : Spark2</name> + + <packaging>jar</packaging> + + <properties> + <spark.version>2.4.8</spark.version> + </properties> + + <dependencies> + <!-- Flink All dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-shade</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-hive-catalog</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.12</artifactId> + <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <!-- Activate these profiles with -Pspark-x.x to build and test against different Spark versions --> + <profiles> + <profile> + <id>spark-2.4</id> + <properties> + <spark.version>2.4.8</spark.version> + </properties> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-sources</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/${spark.version}</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes combine.children="append"> + <include>org.apache.flink:flink-table-store-shade</include> + <include>org.apache.flink:flink-table-store-hive-catalog</include> Review Comment: ditto ########## flink-table-store-spark2/pom.xml: ########## @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-table-store-parent</artifactId> + <groupId>org.apache.flink</groupId> + <version>0.2-SNAPSHOT</version> + </parent> + + <artifactId>flink-table-store-spark2</artifactId> + <name>Flink Table Store : Spark2</name> + + <packaging>jar</packaging> + + <properties> + <spark.version>2.4.8</spark.version> + </properties> + + <dependencies> + <!-- Flink All dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-shade</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-hive-catalog</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.12</artifactId> + <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <!-- Activate these profiles with -Pspark-x.x to build and test against different Spark versions --> + <profiles> + <profile> + <id>spark-2.4</id> + <properties> + <spark.version>2.4.8</spark.version> + </properties> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> Review Comment: Remove this, this is for multiple spark versions. ########## flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.spark; + +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.RecordReaderIterator; +import org.apache.flink.table.store.table.FileStoreTable; +import org.apache.flink.table.store.table.source.Split; +import org.apache.flink.table.store.table.source.TableRead; +import org.apache.flink.table.store.utils.TypeUtils; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.UncheckedIOException; + +/** A Spark {@link InputPartition} for table store. */ +public class SparkInputPartition implements InputPartition<InternalRow> { + + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + private final int[] projectedFields; + + private transient Split split; + + public SparkInputPartition(FileStoreTable table, int[] projectedFields, Split split) { + this.table = table; + this.projectedFields = projectedFields; + this.split = split; + } + + @Override + public InputPartitionReader<InternalRow> createPartitionReader() { + RecordReader<RowData> recordReader; + try { + TableRead tableRead = table.newRead(); Review Comment: Copy filter push down after https://github.com/apache/flink-table-store/pull/212 merged. ########## pom.xml: ########## @@ -514,7 +515,7 @@ under the License. <rules> <bannedDependencies> <excludes> - <exclude>com.fasterxml.jackson*:*:(,2.9.0]</exclude> + <exclude>com.fasterxml.jackson*:*:(,2.5.0]</exclude> Review Comment: Can we exclude `com.fasterxml.jackson` for `spark-sql`? ########## flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.spark; + +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; + +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.UserDefinedType; + +import java.util.ArrayList; +import java.util.List; + +/** Utils for Spark {@link DataType}. */ +public class SparkTypeUtils { + + private SparkTypeUtils() {} + + public static StructType fromFlinkRowType(RowType type) { + return (StructType) fromFlinkType(type); + } + + public static DataType fromFlinkType(LogicalType type) { + return type.accept(FlinkToSparkTypeVisitor.INSTANCE); + } + + public static LogicalType toFlinkType(DataType dataType) { + return SparkToFlinkTypeVisitor.visit(dataType); + } + + private static class FlinkToSparkTypeVisitor extends LogicalTypeDefaultVisitor<DataType> { + + private static final FlinkToSparkTypeVisitor INSTANCE = new FlinkToSparkTypeVisitor(); + + @Override + public DataType visit(CharType charType) { + return DataTypes.StringType; + } + + @Override + public DataType visit(VarCharType varCharType) { + return DataTypes.StringType; + } + + @Override + public DataType visit(BooleanType booleanType) { + return DataTypes.BooleanType; + } + + @Override + public DataType visit(BinaryType binaryType) { + return DataTypes.BinaryType; + } + + @Override + public DataType visit(VarBinaryType varBinaryType) { + return DataTypes.BinaryType; + } + + @Override + public DataType visit(DecimalType decimalType) { + return DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()); + } + + @Override + public DataType visit(TinyIntType tinyIntType) { + return DataTypes.ByteType; + } + + @Override + public DataType visit(SmallIntType smallIntType) { + return DataTypes.ShortType; + } + + @Override + public DataType visit(IntType intType) { + return DataTypes.IntegerType; + } + + @Override + public DataType visit(BigIntType bigIntType) { + return DataTypes.LongType; + } + + @Override + public DataType visit(FloatType floatType) { + return DataTypes.FloatType; + } + + @Override + public DataType visit(DoubleType doubleType) { + return DataTypes.DoubleType; + } + + @Override + public DataType visit(DateType dateType) { + return DataTypes.DateType; + } + + @Override + public DataType visit(TimestampType timestampType) { + return DataTypes.TimestampType; + } + + @Override + public DataType visit(LocalZonedTimestampType localZonedTimestampType) { + return DataTypes.TimestampType; + } + + @Override + public DataType visit(ArrayType arrayType) { + LogicalType elementType = arrayType.getElementType(); + return DataTypes.createArrayType(elementType.accept(this), elementType.isNullable()); + } + + @Override + public DataType visit(MultisetType multisetType) { + return DataTypes.createMapType( + multisetType.getElementType().accept(this), DataTypes.IntegerType, false); + } + + @Override + public DataType visit(MapType mapType) { + return DataTypes.createMapType( + mapType.getKeyType().accept(this), + mapType.getValueType().accept(this), + mapType.getValueType().isNullable()); + } + + @Override + public DataType visit(RowType rowType) { + List<StructField> fields = new ArrayList<>(rowType.getFieldCount()); + for (RowField field : rowType.getFields()) { + StructField structField = + DataTypes.createStructField( + field.getName(), + field.getType().accept(this), + field.getType().isNullable()); + structField = + field.getDescription().map(structField::withComment).orElse(structField); + fields.add(structField); + } + return DataTypes.createStructType(fields); + } + + @Override + protected DataType defaultMethod(LogicalType logicalType) { + throw new UnsupportedOperationException("Unsupported type: " + logicalType); + } + } + + private static class SparkToFlinkTypeVisitor { Review Comment: Remove this, no usage. ########## flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.spark; + +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.NullType; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.sql.types.UserDefinedType; + +/** Reader of Spark {@link SpecializedGetters}. */ +public final class SpecializedGettersReader { + + private SpecializedGettersReader() {} + + public static Object read( Review Comment: Can we remove `handleNull` and `handleUserDefinedType`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
