This is an automated email from the ASF dual-hosted git repository.

yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 39cba699d0 [flink] Bump Flink version to 2.1 (#6027)
39cba699d0 is described below

commit 39cba699d0861322c031868fcb590dd1a36f0b32
Author: Xuannan <suxuanna...@gmail.com>
AuthorDate: Wed Aug 20 13:52:25 2025 +0800

    [flink] Bump Flink version to 2.1 (#6027)
---
 .github/workflows/e2e-tests-flink-2.x-jdk11.yml    |   2 +-
 .github/workflows/utitcase-flink-2.x-jdk11.yml     |   2 +-
 paimon-e2e-tests/pom.xml                           |  10 +
 .../apache/flink/types/variant/BinaryVariant.java  |  32 +++
 .../org/apache/flink/types/variant/Variant.java    |  27 ++
 paimon-flink/paimon-flink-2.1/pom.xml              |  93 ++++++
 paimon-flink/paimon-flink-common/pom.xml           |  12 +-
 .../java/org/apache/paimon/flink/FlinkRowData.java |  12 +
 .../paimon/flink/NestedProjectedRowData.java       |   5 +
 .../org/apache/paimon/flink/ProjectedRowData.java  |   5 +
 .../org/apache/paimon/flink/ChangelogModeTest.java |  10 +-
 .../apache/paimon/flink/SchemaChangeITCase.java    |   3 -
 .../apache/paimon/flink/SerializableRowData.java   |   6 +
 .../java/org/apache/flink/table/data/RowData.java  | 314 +++++++++++++++++++++
 .../apache/flink/types/variant/BinaryVariant.java  |  32 +++
 .../org/apache/flink/types/variant/Variant.java    |  27 ++
 pom.xml                                            |   8 +-
 17 files changed, 585 insertions(+), 15 deletions(-)

diff --git a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml 
b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
index 6333f9f4b9..5f97778467 100644
--- a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
+++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
@@ -39,7 +39,7 @@ jobs:
       fail-fast: true
       matrix:
         # Last element should be the current default flink version
-        flink_version: [ '2.0' ]
+        flink_version: [ '2.0', '2.1' ]
     steps:
       - name: Checkout code
         uses: actions/checkout@v4
diff --git a/.github/workflows/utitcase-flink-2.x-jdk11.yml 
b/.github/workflows/utitcase-flink-2.x-jdk11.yml
index 40cd479001..8e90e679cb 100644
--- a/.github/workflows/utitcase-flink-2.x-jdk11.yml
+++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml
@@ -56,7 +56,7 @@ jobs:
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
           test_modules=""
-          for suffix in 2.0 common; do
+          for suffix in 2.0 2.1 common; do
           test_modules+="org.apache.paimon:paimon-flink-${suffix},"
           done
           test_modules="${test_modules%,}"
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 6e5a305a22..c46668e953 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -294,6 +294,16 @@ under the License.
 
     <profiles>
         <!-- Activate these profiles with -Pflink-x.xx to build and test 
against different Flink versions -->
+
+        <profile>
+            <id>flink-2.0</id>
+            <properties>
+                <test.flink.main.version>2.0</test.flink.main.version>
+                <test.flink.version>2.0.0</test.flink.version>
+                
<test.flink.connector.kafka.version>4.0.0-${test.flink.main.version}</test.flink.connector.kafka.version>
+            </properties>
+        </profile>
+
         <profile>
             <id>flink-1.19</id>
             <properties>
diff --git 
a/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
 
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
new file mode 100644
index 0000000000..1311188617
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+/**
+ * A data structure that represents a semi-structured value. It consists of 
two binary values: value
+ * and metadata. The value encodes types and values, but not field names. The 
metadata currently
+ * contains a version flag and a list of field names. We can extend/modify the 
detailed binary
+ * format given the version flag.
+ *
+ * @see <a 
href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md";>Variant
+ *     Binary Encoding</a> for the detail layout of the data structure.
+ */
+public class BinaryVariant implements Variant {
+    public BinaryVariant(byte[] value, byte[] metadata) {}
+}
diff --git 
a/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/Variant.java
 
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/Variant.java
new file mode 100644
index 0000000000..9f6f970b69
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/Variant.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/** Variant represent a semi-structured data. */
+@PublicEvolving
+public interface Variant extends Serializable {}
diff --git a/paimon-flink/paimon-flink-2.1/pom.xml 
b/paimon-flink/paimon-flink-2.1/pom.xml
new file mode 100644
index 0000000000..062154d9fe
--- /dev/null
+++ b/paimon-flink/paimon-flink-2.1/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.paimon</groupId>
+        <artifactId>paimon-flink</artifactId>
+        <version>1.3-SNAPSHOT</version>
+    </parent>
+
+    <packaging>jar</packaging>
+
+    <artifactId>paimon-flink-2.1</artifactId>
+    <name>Paimon : Flink : 2.1</name>
+
+    <properties>
+        <flink.version>2.1.0</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink2-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-paimon</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes combine.children="append">
+                                    
<include>org.apache.paimon:paimon-flink-common</include>
+                                    
<include>org.apache.paimon:paimon-flink2-common</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index 28ce295453..715718b9cb 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -38,6 +38,12 @@ under the License.
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>${paimon-flinkx-common}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-core</artifactId>
@@ -81,12 +87,6 @@ under the License.
           <scope>test</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.paimon</groupId>
-            <artifactId>${paimon-flinkx-common}</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>${paimon-flinkx-common}</artifactId>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
index 6e71b32922..a4d81364b4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
@@ -33,6 +33,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.BinaryVariant;
+import org.apache.flink.types.variant.Variant;
 
 import static org.apache.paimon.flink.FlinkRowWrapper.fromFlinkRowKind;
 
@@ -145,6 +147,11 @@ public class FlinkRowData implements RowData {
         return new FlinkRowData(row.getRow(pos, numFields));
     }
 
+    public Variant getVariant(int pos) {
+        org.apache.paimon.data.variant.Variant variant = row.getVariant(pos);
+        return new BinaryVariant(variant.value(), variant.metadata());
+    }
+
     private static class FlinkArrayData implements ArrayData {
 
         private final InternalArray array;
@@ -218,6 +225,11 @@ public class FlinkRowData implements RowData {
             throw new UnsupportedOperationException();
         }
 
+        public Variant getVariant(int pos) {
+            org.apache.paimon.data.variant.Variant variant = 
array.getVariant(pos);
+            return new BinaryVariant(variant.value(), variant.metadata());
+        }
+
         @Override
         public byte[] getBinary(int pos) {
             return array.getBinary(pos);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java
index 810cc1ae42..b9fa8b9d6b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.Variant;
 
 import javax.annotation.Nullable;
 
@@ -195,6 +196,10 @@ public class NestedProjectedRowData implements RowData, 
Serializable {
         return getFieldAs(pos, (rowData, internalPos) -> 
rowData.getRow(internalPos, numFields));
     }
 
+    public Variant getVariant(int pos) {
+        return getFieldAs(pos, RowData::getVariant);
+    }
+
     private @Nullable RowData extractInternalRow(int pos) {
         int[] projectedField = projectedFields[pos];
         RowData rowData = this.row;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java
index 6bdbe58801..03d8420cef 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.Variant;
 
 import java.util.Arrays;
 
@@ -156,6 +157,10 @@ public class ProjectedRowData implements RowData {
         return row.getRow(indexMapping[pos], numFields);
     }
 
+    public Variant getVariant(int pos) {
+        return row.getVariant(indexMapping[pos]);
+    }
+
     @Override
     public boolean equals(Object o) {
         throw new UnsupportedOperationException("Projected row data cannot be 
compared");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
index 9022bbb505..7a24efa0fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.types.RowType;
 
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.types.RowKind;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -76,7 +77,14 @@ public class ChangelogModeTest {
 
     @Test
     public void testDefault() throws Exception {
-        test(new Options(), ChangelogMode.upsert(), ChangelogMode.upsert());
+        test(
+                new Options(),
+                ChangelogMode.upsert(),
+                ChangelogMode.newBuilder()
+                        .addContainedKind(RowKind.INSERT)
+                        .addContainedKind(RowKind.UPDATE_AFTER)
+                        .addContainedKind(RowKind.DELETE)
+                        .build());
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index da260851a8..8edc85a3c2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.utils.DateTimeUtils;
 
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
@@ -685,7 +684,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
                                         "INSERT INTO T VALUES('aaa', 'bbb', 
'ccc', 1, CAST(NULL AS FLOAT))"))
                 .satisfies(
                         anyCauseMatches(
-                                TableException.class,
                                 "Column 'e' is NOT NULL, however, a null value 
is being written into it."));
 
         // Not null -> nullable
@@ -718,7 +716,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
                                         "INSERT INTO T VALUES('aaa', 'bbb', 
CAST(NULL AS STRING), 1, CAST(NULL AS FLOAT))"))
                 .satisfies(
                         anyCauseMatches(
-                                TableException.class,
                                 "Column 'c' is NOT NULL, however, a null value 
is being written into it."));
 
         // Insert a null value
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
index 75b96cbe02..3b0db92a02 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.Variant;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -153,4 +154,9 @@ public class SerializableRowData implements RowData, 
Serializable {
     public RowData getRow(int i, int rowArity) {
         return row.getRow(i, rowArity);
     }
+
+    @Override
+    public Variant getVariant(int i) {
+        return row.getVariant(i);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/RowData.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/RowData.java
new file mode 100644
index 0000000000..f0dd820066
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/RowData.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.Variant;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/**
+ * Base interface for an internal data structure representing data of {@link 
RowType} and other
+ * (possibly nested) structured types such as {@link StructuredType} in the 
table ecosystem.
+ *
+ * <p>All top-level records that are travelling through Table API or SQL 
pipelines during runtime
+ * are instances of this interface. Each {@link RowData} contains a {@link 
RowKind} which represents
+ * the kind of change that a row describes in a changelog. The {@link RowKind} 
is just metadata
+ * information of row and thus not part of the table's schema, i.e., not a 
dedicated field.
+ *
+ * <p>Note: All fields of this data structure must be internal data structures.
+ *
+ * <p>The {@link RowData} interface has different implementations which are 
designed for different
+ * scenarios:
+ *
+ * <ul>
+ *   <li>The binary-oriented implementation {@code BinaryRowData} is backed by 
references to {@link
+ *       MemorySegment} instead of using Java objects to reduce the 
serialization/deserialization
+ *       overhead.
+ *   <li>The object-oriented implementation {@link GenericRowData} is backed 
by an array of Java
+ *       {@link Object} which is easy to construct and efficient to update.
+ * </ul>
+ *
+ * <p>{@link GenericRowData} is intended for public use and has stable 
behavior. It is recommended
+ * to construct instances of {@link RowData} with this class if internal data 
structures are
+ * required.
+ *
+ * <p>The mappings from Flink's Table API and SQL data types to the internal 
data structures are
+ * listed in the following table:
+ *
+ * <pre>
+ * +--------------------------------+-----------------------------------------+
+ * | SQL Data Types                 | Internal Data Structures                |
+ * +--------------------------------+-----------------------------------------+
+ * | BOOLEAN                        | boolean                                 |
+ * +--------------------------------+-----------------------------------------+
+ * | CHAR / VARCHAR / STRING        | {@link StringData}                      |
+ * +--------------------------------+-----------------------------------------+
+ * | BINARY / VARBINARY / BYTES     | byte[]                                  |
+ * +--------------------------------+-----------------------------------------+
+ * | DECIMAL                        | {@link DecimalData}                     |
+ * +--------------------------------+-----------------------------------------+
+ * | TINYINT                        | byte                                    |
+ * +--------------------------------+-----------------------------------------+
+ * | SMALLINT                       | short                                   |
+ * +--------------------------------+-----------------------------------------+
+ * | INT                            | int                                     |
+ * +--------------------------------+-----------------------------------------+
+ * | BIGINT                         | long                                    |
+ * +--------------------------------+-----------------------------------------+
+ * | FLOAT                          | float                                   |
+ * +--------------------------------+-----------------------------------------+
+ * | DOUBLE                         | double                                  |
+ * +--------------------------------+-----------------------------------------+
+ * | DATE                           | int (number of days since epoch)        |
+ * +--------------------------------+-----------------------------------------+
+ * | TIME                           | int (number of milliseconds of the day) |
+ * +--------------------------------+-----------------------------------------+
+ * | TIMESTAMP                      | {@link TimestampData}                   |
+ * +--------------------------------+-----------------------------------------+
+ * | TIMESTAMP WITH LOCAL TIME ZONE | {@link TimestampData}                   |
+ * +--------------------------------+-----------------------------------------+
+ * | INTERVAL YEAR TO MONTH         | int (number of months)                  |
+ * +--------------------------------+-----------------------------------------+
+ * | INTERVAL DAY TO MONTH          | long (number of milliseconds)           |
+ * +--------------------------------+-----------------------------------------+
+ * | ROW / structured types         | {@link RowData}                         |
+ * +--------------------------------+-----------------------------------------+
+ * | ARRAY                          | {@link ArrayData}                       |
+ * +--------------------------------+-----------------------------------------+
+ * | MAP / MULTISET                 | {@link MapData}                         |
+ * +--------------------------------+-----------------------------------------+
+ * | RAW                            | {@link RawValueData}                    |
+ * +--------------------------------+-----------------------------------------+
+ * </pre>
+ *
+ * <p>Nullability is always handled by the container data structure.
+ */
+@PublicEvolving
+public interface RowData {
+
+    /**
+     * Returns the number of fields in this row.
+     *
+     * <p>The number does not include {@link RowKind}. It is kept separately.
+     */
+    int getArity();
+
+    /**
+     * Returns the kind of change that this row describes in a changelog.
+     *
+     * @see RowKind
+     */
+    RowKind getRowKind();
+
+    /**
+     * Sets the kind of change that this row describes in a changelog.
+     *
+     * @see RowKind
+     */
+    void setRowKind(RowKind kind);
+
+    // 
------------------------------------------------------------------------------------------
+    // Read-only accessor methods
+    // 
------------------------------------------------------------------------------------------
+
+    /** Returns true if the field is null at the given position. */
+    boolean isNullAt(int pos);
+
+    /** Returns the boolean value at the given position. */
+    boolean getBoolean(int pos);
+
+    /** Returns the byte value at the given position. */
+    byte getByte(int pos);
+
+    /** Returns the short value at the given position. */
+    short getShort(int pos);
+
+    /** Returns the integer value at the given position. */
+    int getInt(int pos);
+
+    /** Returns the long value at the given position. */
+    long getLong(int pos);
+
+    /** Returns the float value at the given position. */
+    float getFloat(int pos);
+
+    /** Returns the double value at the given position. */
+    double getDouble(int pos);
+
+    /** Returns the string value at the given position. */
+    StringData getString(int pos);
+
+    /**
+     * Returns the decimal value at the given position.
+     *
+     * <p>The precision and scale are required to determine whether the 
decimal value was stored in
+     * a compact representation (see {@link DecimalData}).
+     */
+    DecimalData getDecimal(int pos, int precision, int scale);
+
+    /**
+     * Returns the timestamp value at the given position.
+     *
+     * <p>The precision is required to determine whether the timestamp value 
was stored in a compact
+     * representation (see {@link TimestampData}).
+     */
+    TimestampData getTimestamp(int pos, int precision);
+
+    /** Returns the raw value at the given position. */
+    <T> RawValueData<T> getRawValue(int pos);
+
+    /** Returns the binary value at the given position. */
+    byte[] getBinary(int pos);
+
+    /** Returns the array value at the given position. */
+    ArrayData getArray(int pos);
+
+    /** Returns the map value at the given position. */
+    MapData getMap(int pos);
+
+    /**
+     * Returns the row value at the given position.
+     *
+     * <p>The number of fields is required to correctly extract the row.
+     */
+    RowData getRow(int pos, int numFields);
+
+    /** Returns the variant value at the given position. */
+    Variant getVariant(int pos);
+
+    // 
------------------------------------------------------------------------------------------
+    // Access Utilities
+    // 
------------------------------------------------------------------------------------------
+
+    /**
+     * Creates an accessor for getting elements in an internal row data 
structure at the given
+     * position.
+     *
+     * @param fieldType the element type of the row
+     * @param fieldPos the element position of the row
+     */
+    static FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) {
+        final FieldGetter fieldGetter;
+        // ordered by type root definition
+        switch (fieldType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                fieldGetter = row -> row.getString(fieldPos);
+                break;
+            case BOOLEAN:
+                fieldGetter = row -> row.getBoolean(fieldPos);
+                break;
+            case BINARY:
+            case VARBINARY:
+                fieldGetter = row -> row.getBinary(fieldPos);
+                break;
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                final int decimalScale = getScale(fieldType);
+                fieldGetter = row -> row.getDecimal(fieldPos, 
decimalPrecision, decimalScale);
+                break;
+            case TINYINT:
+                fieldGetter = row -> row.getByte(fieldPos);
+                break;
+            case SMALLINT:
+                fieldGetter = row -> row.getShort(fieldPos);
+                break;
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case INTERVAL_YEAR_MONTH:
+                fieldGetter = row -> row.getInt(fieldPos);
+                break;
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                fieldGetter = row -> row.getLong(fieldPos);
+                break;
+            case FLOAT:
+                fieldGetter = row -> row.getFloat(fieldPos);
+                break;
+            case DOUBLE:
+                fieldGetter = row -> row.getDouble(fieldPos);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int timestampPrecision = getPrecision(fieldType);
+                fieldGetter = row -> row.getTimestamp(fieldPos, 
timestampPrecision);
+                break;
+            case TIMESTAMP_WITH_TIME_ZONE:
+                throw new UnsupportedOperationException();
+            case ARRAY:
+                fieldGetter = row -> row.getArray(fieldPos);
+                break;
+            case MULTISET:
+            case MAP:
+                fieldGetter = row -> row.getMap(fieldPos);
+                break;
+            case ROW:
+            case STRUCTURED_TYPE:
+                final int rowFieldCount = getFieldCount(fieldType);
+                fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
+                break;
+            case DISTINCT_TYPE:
+                fieldGetter =
+                        createFieldGetter(((DistinctType) 
fieldType).getSourceType(), fieldPos);
+                break;
+            case RAW:
+                fieldGetter = row -> row.getRawValue(fieldPos);
+                break;
+            case NULL:
+            case SYMBOL:
+            case UNRESOLVED:
+            default:
+                throw new IllegalArgumentException();
+        }
+        if (!fieldType.isNullable()) {
+            return fieldGetter;
+        }
+        return row -> {
+            if (row.isNullAt(fieldPos)) {
+                return null;
+            }
+            return fieldGetter.getFieldOrNull(row);
+        };
+    }
+
+    /**
+     * Accessor for getting the field of a row during runtime.
+     *
+     * @see #createFieldGetter(LogicalType, int)
+     */
+    @PublicEvolving
+    interface FieldGetter extends Serializable {
+        @Nullable
+        Object getFieldOrNull(RowData row);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
new file mode 100644
index 0000000000..1311188617
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+/**
+ * A data structure that represents a semi-structured value. It consists of 
two binary values: value
+ * and metadata. The value encodes types and values, but not field names. The 
metadata currently
+ * contains a version flag and a list of field names. We can extend/modify the 
detailed binary
+ * format given the version flag.
+ *
+ * @see <a 
href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md";>Variant
+ *     Binary Encoding</a> for the detail layout of the data structure.
+ */
+public class BinaryVariant implements Variant {
+    public BinaryVariant(byte[] value, byte[] metadata) {}
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
new file mode 100644
index 0000000000..9f6f970b69
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/** Variant represent a semi-structured data. */
+@PublicEvolving
+public interface Variant extends Serializable {}
diff --git a/pom.xml b/pom.xml
index bace1e5165..65453e7d3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -466,13 +466,15 @@ under the License.
             <id>flink2</id>
             <properties>
                 
<paimon-flinkx-common>paimon-flink2-common</paimon-flinkx-common>
-                
<paimon-flink-common.flink.version>2.0.0</paimon-flink-common.flink.version>
-                <test.flink.main.version>2.0</test.flink.main.version>
-                <test.flink.version>2.0.0</test.flink.version>
+                
<paimon-flink-common.flink.version>2.1.0</paimon-flink-common.flink.version>
+                <test.flink.main.version>2.1</test.flink.main.version>
+                <test.flink.version>2.1.0</test.flink.version>
+                <target.java.version>11</target.java.version>
             </properties>
             <modules>
                 <module>paimon-flink/paimon-flink2-common</module>
                 <module>paimon-flink/paimon-flink-2.0</module>
+                <module>paimon-flink/paimon-flink-2.1</module>
             </modules>
             <activation>
                 <property>


Reply via email to