This is an automated email from the ASF dual-hosted git repository. yunfengzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 39cba699d0 [flink] Bump Flink version to 2.1 (#6027) 39cba699d0 is described below commit 39cba699d0861322c031868fcb590dd1a36f0b32 Author: Xuannan <suxuanna...@gmail.com> AuthorDate: Wed Aug 20 13:52:25 2025 +0800 [flink] Bump Flink version to 2.1 (#6027) --- .github/workflows/e2e-tests-flink-2.x-jdk11.yml | 2 +- .github/workflows/utitcase-flink-2.x-jdk11.yml | 2 +- paimon-e2e-tests/pom.xml | 10 + .../apache/flink/types/variant/BinaryVariant.java | 32 +++ .../org/apache/flink/types/variant/Variant.java | 27 ++ paimon-flink/paimon-flink-2.1/pom.xml | 93 ++++++ paimon-flink/paimon-flink-common/pom.xml | 12 +- .../java/org/apache/paimon/flink/FlinkRowData.java | 12 + .../paimon/flink/NestedProjectedRowData.java | 5 + .../org/apache/paimon/flink/ProjectedRowData.java | 5 + .../org/apache/paimon/flink/ChangelogModeTest.java | 10 +- .../apache/paimon/flink/SchemaChangeITCase.java | 3 - .../apache/paimon/flink/SerializableRowData.java | 6 + .../java/org/apache/flink/table/data/RowData.java | 314 +++++++++++++++++++++ .../apache/flink/types/variant/BinaryVariant.java | 32 +++ .../org/apache/flink/types/variant/Variant.java | 27 ++ pom.xml | 8 +- 17 files changed, 585 insertions(+), 15 deletions(-) diff --git a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml index 6333f9f4b9..5f97778467 100644 --- a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml +++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml @@ -39,7 +39,7 @@ jobs: fail-fast: true matrix: # Last element should be the current default flink version - flink_version: [ '2.0' ] + flink_version: [ '2.0', '2.1' ] steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/.github/workflows/utitcase-flink-2.x-jdk11.yml b/.github/workflows/utitcase-flink-2.x-jdk11.yml index 40cd479001..8e90e679cb 100644 --- a/.github/workflows/utitcase-flink-2.x-jdk11.yml +++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml @@ -56,7 +56,7 @@ jobs: jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" test_modules="" - for suffix in 2.0 common; do + for suffix in 2.0 2.1 common; do test_modules+="org.apache.paimon:paimon-flink-${suffix}," done test_modules="${test_modules%,}" diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml index 6e5a305a22..c46668e953 100644 --- a/paimon-e2e-tests/pom.xml +++ b/paimon-e2e-tests/pom.xml @@ -294,6 +294,16 @@ under the License. <profiles> <!-- Activate these profiles with -Pflink-x.xx to build and test against different Flink versions --> + + <profile> + <id>flink-2.0</id> + <properties> + <test.flink.main.version>2.0</test.flink.main.version> + <test.flink.version>2.0.0</test.flink.version> + <test.flink.connector.kafka.version>4.0.0-${test.flink.main.version}</test.flink.connector.kafka.version> + </properties> + </profile> + <profile> <id>flink-1.19</id> <properties> diff --git a/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/BinaryVariant.java b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/BinaryVariant.java new file mode 100644 index 0000000000..1311188617 --- /dev/null +++ b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/BinaryVariant.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types.variant; + +/** + * A data structure that represents a semi-structured value. It consists of two binary values: value + * and metadata. The value encodes types and values, but not field names. The metadata currently + * contains a version flag and a list of field names. We can extend/modify the detailed binary + * format given the version flag. + * + * @see <a href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md">Variant + * Binary Encoding</a> for the detail layout of the data structure. + */ +public class BinaryVariant implements Variant { + public BinaryVariant(byte[] value, byte[] metadata) {} +} diff --git a/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/Variant.java b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/Variant.java new file mode 100644 index 0000000000..9f6f970b69 --- /dev/null +++ b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/Variant.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types.variant; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** Variant represent a semi-structured data. */ +@PublicEvolving +public interface Variant extends Serializable {} diff --git a/paimon-flink/paimon-flink-2.1/pom.xml b/paimon-flink/paimon-flink-2.1/pom.xml new file mode 100644 index 0000000000..062154d9fe --- /dev/null +++ b/paimon-flink/paimon-flink-2.1/pom.xml @@ -0,0 +1,93 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-flink</artifactId> + <version>1.3-SNAPSHOT</version> + </parent> + + <packaging>jar</packaging> + + <artifactId>paimon-flink-2.1</artifactId> + <name>Paimon : Flink : 2.1</name> + + <properties> + <flink.version>2.1.0</flink.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-flink-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-flink2-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-paimon</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes combine.children="append"> + <include>org.apache.paimon:paimon-flink-common</include> + <include>org.apache.paimon:paimon-flink2-common</include> + </includes> + </artifactSet> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index 28ce295453..715718b9cb 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -38,6 +38,12 @@ under the License. </properties> <dependencies> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>${paimon-flinkx-common}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> @@ -81,12 +87,6 @@ under the License. <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.paimon</groupId> - <artifactId>${paimon-flinkx-common}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.paimon</groupId> <artifactId>${paimon-flinkx-common}</artifactId> diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java index 6e71b32922..a4d81364b4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java @@ -33,6 +33,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.types.RowKind; +import org.apache.flink.types.variant.BinaryVariant; +import org.apache.flink.types.variant.Variant; import static org.apache.paimon.flink.FlinkRowWrapper.fromFlinkRowKind; @@ -145,6 +147,11 @@ public class FlinkRowData implements RowData { return new FlinkRowData(row.getRow(pos, numFields)); } + public Variant getVariant(int pos) { + org.apache.paimon.data.variant.Variant variant = row.getVariant(pos); + return new BinaryVariant(variant.value(), variant.metadata()); + } + private static class FlinkArrayData implements ArrayData { private final InternalArray array; @@ -218,6 +225,11 @@ public class FlinkRowData implements RowData { throw new UnsupportedOperationException(); } + public Variant getVariant(int pos) { + org.apache.paimon.data.variant.Variant variant = array.getVariant(pos); + return new BinaryVariant(variant.value(), variant.metadata()); + } + @Override public byte[] getBinary(int pos) { return array.getBinary(pos); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java index 810cc1ae42..b9fa8b9d6b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java @@ -28,6 +28,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import org.apache.flink.types.variant.Variant; import javax.annotation.Nullable; @@ -195,6 +196,10 @@ public class NestedProjectedRowData implements RowData, Serializable { return getFieldAs(pos, (rowData, internalPos) -> rowData.getRow(internalPos, numFields)); } + public Variant getVariant(int pos) { + return getFieldAs(pos, RowData::getVariant); + } + private @Nullable RowData extractInternalRow(int pos) { int[] projectedField = projectedFields[pos]; RowData rowData = this.row; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java index 6bdbe58801..03d8420cef 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java @@ -27,6 +27,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; +import org.apache.flink.types.variant.Variant; import java.util.Arrays; @@ -156,6 +157,10 @@ public class ProjectedRowData implements RowData { return row.getRow(indexMapping[pos], numFields); } + public Variant getVariant(int pos) { + return row.getVariant(indexMapping[pos]); + } + @Override public boolean equals(Object o) { throw new UnsupportedOperationException("Projected row data cannot be compared"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java index 9022bbb505..7a24efa0fa 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.types.RowType; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.types.RowKind; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -76,7 +77,14 @@ public class ChangelogModeTest { @Test public void testDefault() throws Exception { - test(new Options(), ChangelogMode.upsert(), ChangelogMode.upsert()); + test( + new Options(), + ChangelogMode.upsert(), + ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build()); } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index da260851a8..8edc85a3c2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -21,7 +21,6 @@ package org.apache.paimon.flink; import org.apache.paimon.data.BinaryString; import org.apache.paimon.utils.DateTimeUtils; -import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; @@ -685,7 +684,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase { "INSERT INTO T VALUES('aaa', 'bbb', 'ccc', 1, CAST(NULL AS FLOAT))")) .satisfies( anyCauseMatches( - TableException.class, "Column 'e' is NOT NULL, however, a null value is being written into it.")); // Not null -> nullable @@ -718,7 +716,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase { "INSERT INTO T VALUES('aaa', 'bbb', CAST(NULL AS STRING), 1, CAST(NULL AS FLOAT))")) .satisfies( anyCauseMatches( - TableException.class, "Column 'c' is NOT NULL, however, a null value is being written into it.")); // Insert a null value diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java index 75b96cbe02..3b0db92a02 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java @@ -29,6 +29,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.types.RowKind; +import org.apache.flink.types.variant.Variant; import java.io.IOException; import java.io.ObjectInputStream; @@ -153,4 +154,9 @@ public class SerializableRowData implements RowData, Serializable { public RowData getRow(int i, int rowArity) { return row.getRow(i, rowArity); } + + @Override + public Variant getVariant(int i) { + return row.getVariant(i); + } } diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/RowData.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/RowData.java new file mode 100644 index 0000000000..f0dd820066 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/RowData.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.types.RowKind; +import org.apache.flink.types.variant.Variant; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** + * Base interface for an internal data structure representing data of {@link RowType} and other + * (possibly nested) structured types such as {@link StructuredType} in the table ecosystem. + * + * <p>All top-level records that are travelling through Table API or SQL pipelines during runtime + * are instances of this interface. Each {@link RowData} contains a {@link RowKind} which represents + * the kind of change that a row describes in a changelog. The {@link RowKind} is just metadata + * information of row and thus not part of the table's schema, i.e., not a dedicated field. + * + * <p>Note: All fields of this data structure must be internal data structures. + * + * <p>The {@link RowData} interface has different implementations which are designed for different + * scenarios: + * + * <ul> + * <li>The binary-oriented implementation {@code BinaryRowData} is backed by references to {@link + * MemorySegment} instead of using Java objects to reduce the serialization/deserialization + * overhead. + * <li>The object-oriented implementation {@link GenericRowData} is backed by an array of Java + * {@link Object} which is easy to construct and efficient to update. + * </ul> + * + * <p>{@link GenericRowData} is intended for public use and has stable behavior. It is recommended + * to construct instances of {@link RowData} with this class if internal data structures are + * required. + * + * <p>The mappings from Flink's Table API and SQL data types to the internal data structures are + * listed in the following table: + * + * <pre> + * +--------------------------------+-----------------------------------------+ + * | SQL Data Types | Internal Data Structures | + * +--------------------------------+-----------------------------------------+ + * | BOOLEAN | boolean | + * +--------------------------------+-----------------------------------------+ + * | CHAR / VARCHAR / STRING | {@link StringData} | + * +--------------------------------+-----------------------------------------+ + * | BINARY / VARBINARY / BYTES | byte[] | + * +--------------------------------+-----------------------------------------+ + * | DECIMAL | {@link DecimalData} | + * +--------------------------------+-----------------------------------------+ + * | TINYINT | byte | + * +--------------------------------+-----------------------------------------+ + * | SMALLINT | short | + * +--------------------------------+-----------------------------------------+ + * | INT | int | + * +--------------------------------+-----------------------------------------+ + * | BIGINT | long | + * +--------------------------------+-----------------------------------------+ + * | FLOAT | float | + * +--------------------------------+-----------------------------------------+ + * | DOUBLE | double | + * +--------------------------------+-----------------------------------------+ + * | DATE | int (number of days since epoch) | + * +--------------------------------+-----------------------------------------+ + * | TIME | int (number of milliseconds of the day) | + * +--------------------------------+-----------------------------------------+ + * | TIMESTAMP | {@link TimestampData} | + * +--------------------------------+-----------------------------------------+ + * | TIMESTAMP WITH LOCAL TIME ZONE | {@link TimestampData} | + * +--------------------------------+-----------------------------------------+ + * | INTERVAL YEAR TO MONTH | int (number of months) | + * +--------------------------------+-----------------------------------------+ + * | INTERVAL DAY TO MONTH | long (number of milliseconds) | + * +--------------------------------+-----------------------------------------+ + * | ROW / structured types | {@link RowData} | + * +--------------------------------+-----------------------------------------+ + * | ARRAY | {@link ArrayData} | + * +--------------------------------+-----------------------------------------+ + * | MAP / MULTISET | {@link MapData} | + * +--------------------------------+-----------------------------------------+ + * | RAW | {@link RawValueData} | + * +--------------------------------+-----------------------------------------+ + * </pre> + * + * <p>Nullability is always handled by the container data structure. + */ +@PublicEvolving +public interface RowData { + + /** + * Returns the number of fields in this row. + * + * <p>The number does not include {@link RowKind}. It is kept separately. + */ + int getArity(); + + /** + * Returns the kind of change that this row describes in a changelog. + * + * @see RowKind + */ + RowKind getRowKind(); + + /** + * Sets the kind of change that this row describes in a changelog. + * + * @see RowKind + */ + void setRowKind(RowKind kind); + + // ------------------------------------------------------------------------------------------ + // Read-only accessor methods + // ------------------------------------------------------------------------------------------ + + /** Returns true if the field is null at the given position. */ + boolean isNullAt(int pos); + + /** Returns the boolean value at the given position. */ + boolean getBoolean(int pos); + + /** Returns the byte value at the given position. */ + byte getByte(int pos); + + /** Returns the short value at the given position. */ + short getShort(int pos); + + /** Returns the integer value at the given position. */ + int getInt(int pos); + + /** Returns the long value at the given position. */ + long getLong(int pos); + + /** Returns the float value at the given position. */ + float getFloat(int pos); + + /** Returns the double value at the given position. */ + double getDouble(int pos); + + /** Returns the string value at the given position. */ + StringData getString(int pos); + + /** + * Returns the decimal value at the given position. + * + * <p>The precision and scale are required to determine whether the decimal value was stored in + * a compact representation (see {@link DecimalData}). + */ + DecimalData getDecimal(int pos, int precision, int scale); + + /** + * Returns the timestamp value at the given position. + * + * <p>The precision is required to determine whether the timestamp value was stored in a compact + * representation (see {@link TimestampData}). + */ + TimestampData getTimestamp(int pos, int precision); + + /** Returns the raw value at the given position. */ + <T> RawValueData<T> getRawValue(int pos); + + /** Returns the binary value at the given position. */ + byte[] getBinary(int pos); + + /** Returns the array value at the given position. */ + ArrayData getArray(int pos); + + /** Returns the map value at the given position. */ + MapData getMap(int pos); + + /** + * Returns the row value at the given position. + * + * <p>The number of fields is required to correctly extract the row. + */ + RowData getRow(int pos, int numFields); + + /** Returns the variant value at the given position. */ + Variant getVariant(int pos); + + // ------------------------------------------------------------------------------------------ + // Access Utilities + // ------------------------------------------------------------------------------------------ + + /** + * Creates an accessor for getting elements in an internal row data structure at the given + * position. + * + * @param fieldType the element type of the row + * @param fieldPos the element position of the row + */ + static FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) { + final FieldGetter fieldGetter; + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case CHAR: + case VARCHAR: + fieldGetter = row -> row.getString(fieldPos); + break; + case BOOLEAN: + fieldGetter = row -> row.getBoolean(fieldPos); + break; + case BINARY: + case VARBINARY: + fieldGetter = row -> row.getBinary(fieldPos); + break; + case DECIMAL: + final int decimalPrecision = getPrecision(fieldType); + final int decimalScale = getScale(fieldType); + fieldGetter = row -> row.getDecimal(fieldPos, decimalPrecision, decimalScale); + break; + case TINYINT: + fieldGetter = row -> row.getByte(fieldPos); + break; + case SMALLINT: + fieldGetter = row -> row.getShort(fieldPos); + break; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + fieldGetter = row -> row.getInt(fieldPos); + break; + case BIGINT: + case INTERVAL_DAY_TIME: + fieldGetter = row -> row.getLong(fieldPos); + break; + case FLOAT: + fieldGetter = row -> row.getFloat(fieldPos); + break; + case DOUBLE: + fieldGetter = row -> row.getDouble(fieldPos); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = getPrecision(fieldType); + fieldGetter = row -> row.getTimestamp(fieldPos, timestampPrecision); + break; + case TIMESTAMP_WITH_TIME_ZONE: + throw new UnsupportedOperationException(); + case ARRAY: + fieldGetter = row -> row.getArray(fieldPos); + break; + case MULTISET: + case MAP: + fieldGetter = row -> row.getMap(fieldPos); + break; + case ROW: + case STRUCTURED_TYPE: + final int rowFieldCount = getFieldCount(fieldType); + fieldGetter = row -> row.getRow(fieldPos, rowFieldCount); + break; + case DISTINCT_TYPE: + fieldGetter = + createFieldGetter(((DistinctType) fieldType).getSourceType(), fieldPos); + break; + case RAW: + fieldGetter = row -> row.getRawValue(fieldPos); + break; + case NULL: + case SYMBOL: + case UNRESOLVED: + default: + throw new IllegalArgumentException(); + } + if (!fieldType.isNullable()) { + return fieldGetter; + } + return row -> { + if (row.isNullAt(fieldPos)) { + return null; + } + return fieldGetter.getFieldOrNull(row); + }; + } + + /** + * Accessor for getting the field of a row during runtime. + * + * @see #createFieldGetter(LogicalType, int) + */ + @PublicEvolving + interface FieldGetter extends Serializable { + @Nullable + Object getFieldOrNull(RowData row); + } +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java new file mode 100644 index 0000000000..1311188617 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types.variant; + +/** + * A data structure that represents a semi-structured value. It consists of two binary values: value + * and metadata. The value encodes types and values, but not field names. The metadata currently + * contains a version flag and a list of field names. We can extend/modify the detailed binary + * format given the version flag. + * + * @see <a href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md">Variant + * Binary Encoding</a> for the detail layout of the data structure. + */ +public class BinaryVariant implements Variant { + public BinaryVariant(byte[] value, byte[] metadata) {} +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java new file mode 100644 index 0000000000..9f6f970b69 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types.variant; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** Variant represent a semi-structured data. */ +@PublicEvolving +public interface Variant extends Serializable {} diff --git a/pom.xml b/pom.xml index bace1e5165..65453e7d3e 100644 --- a/pom.xml +++ b/pom.xml @@ -466,13 +466,15 @@ under the License. <id>flink2</id> <properties> <paimon-flinkx-common>paimon-flink2-common</paimon-flinkx-common> - <paimon-flink-common.flink.version>2.0.0</paimon-flink-common.flink.version> - <test.flink.main.version>2.0</test.flink.main.version> - <test.flink.version>2.0.0</test.flink.version> + <paimon-flink-common.flink.version>2.1.0</paimon-flink-common.flink.version> + <test.flink.main.version>2.1</test.flink.main.version> + <test.flink.version>2.1.0</test.flink.version> + <target.java.version>11</target.java.version> </properties> <modules> <module>paimon-flink/paimon-flink2-common</module> <module>paimon-flink/paimon-flink-2.0</module> + <module>paimon-flink/paimon-flink-2.1</module> </modules> <activation> <property>