This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b6bc1658e8f2 chore: migrate the flink ITs run to flink2.1 (#18717)
b6bc1658e8f2 is described below

commit b6bc1658e8f2862583c22425eaa0665ae100f5a3
Author: Danny Chan <[email protected]>
AuthorDate: Tue May 12 17:09:39 2026 +0800

    chore: migrate the flink ITs run to flink2.1 (#18717)
    
    * chore: migrate the flink ITs to run with flink2.1
    
    * fix compile errors
    
    * fix test failures
    
    * fix test failure
    
    * address review comments
---
 .github/workflows/bot.yml                          | 30 ++++----
 azure-pipelines-20230430.yml                       |  4 +-
 .../hudi/util/TestHoodieSchemaConverter.java       | 23 +++---
 .../hudi/sink/bootstrap/RLIBootstrapOperator.java  | 27 ++++---
 .../sink/bootstrap/TestRLIBootstrapOperator.java   | 80 +++++++++++++++++++++
 .../ITTestVariantCrossEngineCompatibility.java     | 31 ++++----
 .../hudi/adapter/DataTypeAdapterTestUtils.java     | 28 ++++++++
 .../hudi/adapter/DataTypeAdapterTestUtils.java     | 28 ++++++++
 .../hudi/adapter/DataTypeAdapterTestUtils.java     | 28 ++++++++
 .../hudi/adapter/DataTypeAdapterTestUtils.java     | 28 ++++++++
 .../runtime/kryo/KryoSerializerSnapshot.java       |  3 +
 .../hudi/adapter/DataTypeAdapterTestUtils.java     | 28 ++++++++
 .../table/format/cow/ParquetSplitReaderUtil.java   | 83 ++++++++++++++++++++++
 .../runtime/kryo/KryoSerializerSnapshot.java       |  3 +
 .../hudi/adapter/DataTypeAdapterTestUtils.java     | 32 +++++++++
 pom.xml                                            | 14 ++--
 16 files changed, 404 insertions(+), 66 deletions(-)

diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 5fb047f174d4..2143edd92bcb 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -67,7 +67,7 @@ jobs:
         include:
           - scalaProfile: "scala-2.12"
             sparkProfile: "spark3.5"
-            flinkProfile: "flink1.18"
+            flinkProfile: "flink2.1"
 
     steps:
       - uses: actions/checkout@v5
@@ -126,7 +126,7 @@ jobs:
         include:
           - scalaProfile: "scala-2.12"
             sparkProfile: "spark3.5"
-            flinkProfile: "flink1.18"
+            flinkProfile: "flink2.1"
 
     steps:
       - uses: actions/checkout@v5
@@ -178,7 +178,7 @@ jobs:
         include:
           - scalaProfile: "scala-2.12"
             sparkProfile: "spark3.5"
-            flinkProfile: "flink1.18"
+            flinkProfile: "flink2.1"
 
     env:
       UT_MODULES: >-
@@ -563,7 +563,7 @@ jobs:
         include:
           - scalaProfile: "scala-2.12"
             sparkProfile: "spark3.5"
-            flinkProfile: "flink1.20"
+            flinkProfile: "flink2.1"
 
     steps:
       - uses: actions/checkout@v5
@@ -938,7 +938,7 @@ jobs:
           FLINK_PROFILE: ${{ matrix.flinkProfile }}
           FLINK_AVRO_VERSION: ${{ matrix.flinkAvroVersion }}
           FLINK_PARQUET_VERSION: ${{ matrix.flinkParquetVersion }}
-        if: ${{ endsWith(env.FLINK_PROFILE, '1.20') }}
+        if: ${{ endsWith(env.FLINK_PROFILE, '2.1') }}
         run: |
           mvn clean install -T 2 -Pintegration-tests -D"$SCALA_PROFILE" 
-D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink -am 
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION" 
-DskipTests=true $MVN_ARGS
           mvn verify -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" 
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION" 
$FLINK_IT_FILTER1 -pl hudi-flink-datasource/hudi-flink $MVN_ARGS
@@ -948,9 +948,9 @@ jobs:
     strategy:
       matrix:
         include:
-          - flinkProfile: "flink1.20"
+          - flinkProfile: "flink2.1"
             flinkAvroVersion: "1.11.4"
-            flinkParquetVersion: '1.13.1'
+            flinkParquetVersion: '1.15.2'
     steps:
       - uses: actions/checkout@v5
       - name: Set up JDK 11
@@ -974,7 +974,7 @@ jobs:
           FLINK_PROFILE: ${{ matrix.flinkProfile }}
           FLINK_AVRO_VERSION: ${{ matrix.flinkAvroVersion }}
           FLINK_PARQUET_VERSION: ${{ matrix.flinkParquetVersion }}
-        if: ${{ endsWith(env.FLINK_PROFILE, '1.20') }}
+        if: ${{ endsWith(env.FLINK_PROFILE, '2.1') }}
         run: |
           mvn clean install -T 2 -Pintegration-tests -D"$SCALA_PROFILE" 
-D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink -am 
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION" 
-DskipTests=true $MVN_ARGS
           mvn verify -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" 
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION" 
$FLINK_IT_FILTER2 -pl hudi-flink-datasource/hudi-flink $MVN_ARGS
@@ -985,19 +985,19 @@ jobs:
       matrix:
         include:
           - scalaProfile: 'scala-2.13'
-            flinkProfile: 'flink1.20'
+            flinkProfile: 'flink2.1'
             sparkProfile: 'spark3.5'
             sparkRuntime: 'spark3.5.0'
           - scalaProfile: 'scala-2.12'
-            flinkProfile: 'flink1.20'
+            flinkProfile: 'flink2.1'
             sparkProfile: 'spark3.5'
             sparkRuntime: 'spark3.5.0'
           - scalaProfile: 'scala-2.13'
-            flinkProfile: 'flink1.20'
+            flinkProfile: 'flink2.1'
             sparkProfile: 'spark4.0'
             sparkRuntime: 'spark4.0.0'
           - scalaProfile: 'scala-2.13'
-            flinkProfile: 'flink1.20'
+            flinkProfile: 'flink2.1'
             sparkProfile: 'spark4.1'
             sparkRuntime: 'spark4.1.1'
 
@@ -1201,7 +1201,7 @@ jobs:
       matrix:
         include:
           - sparkProfile: 'spark3.5'
-            flinkProfile: 'flink1.20'
+            flinkProfile: 'flink2.1'
             sparkArchive: 'spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz'
     steps:
       - uses: actions/checkout@v5
@@ -1297,9 +1297,9 @@ jobs:
       matrix:
         include:
           - scalaProfile: "scala-2.12"
-            flinkProfile: "flink1.20"
+            flinkProfile: "flink2.1"
             flinkAvroVersion: '1.11.4'
-            flinkParquetVersion: '1.13.1'
+            flinkParquetVersion: '1.15.2'
     steps:
       - uses: actions/checkout@v5
       - name: Set up JDK 17
diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml
index 4b1253a30963..cccd89ec6cc7 100644
--- a/azure-pipelines-20230430.yml
+++ b/azure-pipelines-20230430.yml
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 # NOTE:
-# This config file defines how Azure CI runs tests with Spark 3.5 and Flink 
1.18 profiles.
+# This config file defines how Azure CI runs tests with Spark 3.5 and Flink 
2.1 profiles.
 # PRs will need to keep in sync with master's version to trigger the CI runs.
 # See scripts/jacoco/README.md for how aggregated code coverage report works
 # across multiple modules.
@@ -100,7 +100,7 @@ parameters:
       - '!packaging/hudi-utilities-slim-bundle'
 
 variables:
-  BUILD_PROFILES: '-Dscala-2.12 -Dspark3.5 -Dflink1.18'
+  BUILD_PROFILES: '-Dscala-2.12 -Dspark3.5 -Dflink2.1'
   PLUGIN_OPTS: '-Dcheckstyle.skip=true -Drat.skip=true -ntp -B -V -Pwarn-log 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn'
   MVN_OPTS_INSTALL: '-T 3 -Phudi-platform-service -DskipTests 
$(BUILD_PROFILES) $(PLUGIN_OPTS) 
-Dmaven.wagon.httpconnectionManager.ttlSeconds=25 
-Dmaven.wagon.http.retryHandler.count=5 -Dorg.eclipse.jetty.LEVEL=WARN'
   MVN_OPTS_TEST: '-fae -Pwarn-log $(BUILD_PROFILES) $(PLUGIN_OPTS)'
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index 51b8a15f4c7b..9a4f920b859e 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -36,11 +36,12 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.VarBinaryType;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -693,24 +694,18 @@ public class TestHoodieSchemaConverter {
   }
 
   @Test
-  @Disabled("disabled and reopen the tests for 1.3")
   public void testVariantTypeConversion() {
     // Test direct Variant conversion
     HoodieSchema variantSchema = HoodieSchema.createVariant();
     DataType dataType = HoodieSchemaConverter.convertToDataType(variantSchema);
     assertNotNull(dataType);
 
-    // Verify it's a ROW with metadata and value binary fields
-    RowType rowType = (RowType) dataType.getLogicalType();
-    assertEquals(2, rowType.getFieldCount());
-    assertEquals("metadata", rowType.getFieldNames().get(0));
-    assertEquals("value", rowType.getFieldNames().get(1));
-    assertInstanceOf(VarBinaryType.class, rowType.getTypeAt(0));
-    assertInstanceOf(VarBinaryType.class, rowType.getTypeAt(1));
+    // Verify it's a Variant
+    assertThat("the return type should be variant",
+        dataType.getLogicalType().asSummaryString(), is("VARIANT NOT NULL"));
   }
 
   @Test
-  @Disabled("disabled and reopen the tests for 1.3")
   public void testVariantInRecordConversion() {
     // Test Variant field within a record
     HoodieSchema recordWithVariant = HoodieSchema.createRecord(
@@ -727,11 +722,9 @@ public class TestHoodieSchemaConverter {
     assertEquals(2, result.getFieldCount());
     assertEquals("data", result.getFieldNames().get(1));
 
-    // Verify variant field is a ROW<metadata BYTES, value BYTES>
-    RowType variantRowType = (RowType) result.getTypeAt(1);
-    assertEquals(2, variantRowType.getFieldCount());
-    assertEquals("metadata", variantRowType.getFieldNames().get(0));
-    assertEquals("value", variantRowType.getFieldNames().get(1));
+    // Verify variant field
+    assertThat("the return type should be variant",
+        result.getTypeAt(1).asSummaryString(), is("VARIANT NOT NULL"));
   }
 
   @Test
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
index 515b180d3cad..d0cb16a789e5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
@@ -52,7 +53,7 @@ import java.util.stream.Collectors;
 public class RLIBootstrapOperator
     extends AbstractBootstrapOperator {
 
-  private transient HoodieBackedTableMetadata metadataTable;
+  private transient HoodieBackedTableMetadata tableMetadata;
   private transient long loadedCnt;
 
   public RLIBootstrapOperator(Configuration conf) {
@@ -63,13 +64,13 @@ public class RLIBootstrapOperator
   public void initializeState(StateInitializationContext context) throws 
Exception {
     loadedCnt = 0;
     HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
-    this.metadataTable = (HoodieBackedTableMetadata) 
metaClient.getTableFormat().getMetadataFactory().create(
+    this.tableMetadata = new HoodieBackedTableMetadata(
         HoodieFlinkEngineContext.DEFAULT,
         metaClient.getStorage(),
         StreamerUtil.metadataConfig(conf),
         conf.get(FlinkOptions.PATH));
     // Load RLI records
-    preLoadRLIRecords();
+    preLoadRLIRecords(metaClient.getTableConfig());
   }
 
   @Override
@@ -82,10 +83,20 @@ public class RLIBootstrapOperator
   //  Utilities
   // -------------------------------------------------------------------------
 
-  private void preLoadRLIRecords() {
+  private void preLoadRLIRecords(HoodieTableConfig tableConfig) {
     int taskID = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
     int parallelism = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
 
+    if (!tableMetadata.enabled()) {
+      if (tableConfig.isMetadataTableAvailable()) {
+        throw new RuntimeException("Can not initialize the table metadata");
+      }
+      log.info("Skip loading RLI records because table metadata is not 
initialized, taskId = {}", taskID);
+      waitForBootstrapReady(taskID);
+      closeMetadataTable();
+      return;
+    }
+
     log.info("Start loading RLI records from metadata table, taskId = {}, 
parallelism = {}", taskID, parallelism);
 
     SerializableFunctionUnchecked<List<FileSlice>, List<FileSlice>> 
fileSlicesFilter = fileSlices -> {
@@ -102,7 +113,7 @@ public class RLIBootstrapOperator
 
     // Each subtask loads buckets assigned to it
     long startTime = System.currentTimeMillis();
-    HoodiePairData<String, HoodieRecordGlobalLocation> rliData = 
metadataTable.readRecordIndexLocations(fileSlicesFilter);
+    HoodiePairData<String, HoodieRecordGlobalLocation> rliData = 
tableMetadata.readRecordIndexLocations(fileSlicesFilter);
     rliData.forEach(locationPair -> emitIndexRecord(locationPair.getLeft(), 
locationPair.getRight()));
     long costMs = System.currentTimeMillis() - startTime;
     log.info("Finish loading RLI records, total records: {}, cost: {} ms, 
taskId = {}", loadedCnt, costMs, taskID);
@@ -133,13 +144,13 @@ public class RLIBootstrapOperator
   }
 
   private void closeMetadataTable() {
-    if (metadataTable != null) {
+    if (tableMetadata != null) {
       try {
-        metadataTable.close();
+        tableMetadata.close();
       } catch (Exception e) {
         log.warn("Failed to close metadata table", e);
       }
-      metadataTable = null;
+      tableMetadata = null;
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bootstrap/TestRLIBootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bootstrap/TestRLIBootstrapOperator.java
new file mode 100644
index 000000000000..2e684d0652ae
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bootstrap/TestRLIBootstrapOperator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link RLIBootstrapOperator}.
+ */
+public class TestRLIBootstrapOperator {
+
+  @TempDir
+  File tempFile;
+
+  @Test
+  void testSkipPreloadForFreshTableWithoutMetadataTable() throws Exception {
+    Configuration conf = getRLIConf();
+    StreamerUtil.initTableIfNotExists(conf);
+
+    try (OneInputStreamOperatorTestHarness<HoodieFlinkInternalRow, 
HoodieFlinkInternalRow> harness =
+             new OneInputStreamOperatorTestHarness<>(new 
RLIBootstrapOperator(conf), 1, 1, 0)) {
+      harness.open();
+
+      assertEquals(0, harness.getOutput().size());
+    }
+  }
+
+  @Test
+  void testFailFastWhenMetadataTableIsMarkedAvailableButCannotBeLoaded() 
throws Exception {
+    Configuration conf = getRLIConf();
+    HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf);
+    metaClient.getTableConfig().setMetadataPartitionState(metaClient, 
MetadataPartitionType.FILES.getPartitionPath(), true);
+
+    try (OneInputStreamOperatorTestHarness<HoodieFlinkInternalRow, 
HoodieFlinkInternalRow> harness =
+             new OneInputStreamOperatorTestHarness<>(new 
RLIBootstrapOperator(conf), 1, 1, 0)) {
+      RuntimeException error = assertThrows(RuntimeException.class, 
harness::open);
+
+      assertEquals("Can not initialize the table metadata", 
error.getMessage());
+    }
+  }
+
+  private Configuration getRLIConf() {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.METADATA_ENABLED, true);
+    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+    return conf;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
index 03305d478ecd..de6e3835d120 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.adapter.DataTypeAdapter;
+import org.apache.hudi.adapter.DataTypeAdapterTestUtils;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.utils.FlinkMiniCluster;
@@ -27,7 +29,6 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
@@ -51,19 +52,16 @@ public class ITTestVariantCrossEngineCompatibility {
 
   /**
    * Helper method to verify that Flink can read Spark 4.0 Variant tables.
-   * Variant data is represented as ROW<metadata BYTES, value BYTES> in Flink.
    */
   private void verifyFlinkCanReadSparkVariantTable(String tablePath, String 
tableType, String testDescription) throws Exception {
     TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
 
     // Create a Hudi table pointing to the Spark-written data
-    // In Flink, Variant is represented as ROW<metadata BYTES, value BYTES>
-    // NOTE: value is a reserved keyword
     String createTableDdl = String.format(
         "CREATE TABLE variant_table ("
             + "  id INT,"
             + "  name STRING,"
-            + "  v ROW<metadata BYTES, `value` BYTES>,"
+            + "  v VARIANT,"
             + "  ts BIGINT,"
             + "  PRIMARY KEY (id) NOT ENFORCED"
             + ") WITH ("
@@ -87,35 +85,32 @@ public class ITTestVariantCrossEngineCompatibility {
     assertEquals("row1", row.getField(1), "Second column should be name=row1");
     assertEquals(1000L, row.getField(3), "Fourth column should be ts=1000");
 
-    // Verify the variant column is readable as a ROW with binary fields
-    Row variantRow = (Row) row.getField(2);
-    assertNotNull(variantRow, "Variant column should not be null");
-
-    byte[] metadataBytes = (byte[]) variantRow.getField(0);
-    byte[] valueBytes = (byte[]) variantRow.getField(1);
+    // Verify the variant column is readable as a native Flink Variant.
+    Object variantObject = row.getField(2);
+    assertNotNull(variantObject, "Variant column should not be null");
+    DataTypeAdapterTestUtils.assertAsBinaryVariant(variantObject);
 
     // Expected byte values from Spark 4.0 Variant representation: {"updated": 
true, "new_field": 123}
     byte[] expectedValueBytes = new byte[]{0x02, 0x02, 0x01, 0x00, 0x01, 0x00, 
0x03, 0x04, 0x0C, 0x7B};
     byte[] expectedMetadataBytes = new byte[]{0x01, 0x02, 0x00, 0x07, 0x10, 
0x75, 0x70, 0x64, 0x61,
         0x74, 0x65, 0x64, 0x6E, 0x65, 0x77, 0x5F, 0x66, 0x69, 0x65, 0x6C, 
0x64};
 
-    assertArrayEquals(expectedValueBytes, valueBytes,
+    assertArrayEquals(expectedValueBytes, 
DataTypeAdapter.getVariantValue(variantObject),
         String.format("Variant value bytes mismatch (%s). Expected: %s, Got: 
%s",
             testDescription,
             Arrays.toString(StringUtils.encodeHex(expectedValueBytes)),
-            Arrays.toString(StringUtils.encodeHex(valueBytes))));
+            
Arrays.toString(StringUtils.encodeHex(DataTypeAdapter.getVariantValue(variantObject)))));
 
-    assertArrayEquals(expectedMetadataBytes, metadataBytes,
+    assertArrayEquals(expectedMetadataBytes, 
DataTypeAdapter.getVariantMetadata(variantObject),
         String.format("Variant metadata bytes mismatch (%s). Expected: %s, 
Got: %s",
             testDescription,
             Arrays.toString(StringUtils.encodeHex(expectedMetadataBytes)),
-            Arrays.toString(StringUtils.encodeHex(metadataBytes))));
+            
Arrays.toString(StringUtils.encodeHex(DataTypeAdapter.getVariantMetadata(variantObject)))));
 
     tableEnv.executeSql("DROP TABLE variant_table");
   }
 
   @Test
-  @Disabled("disabled and reopen the tests for 1.3")
   public void testFlinkReadSparkVariantCOWTable() throws Exception {
     // Test that Flink can read a COW table with Variant data written by Spark 
4.0
     Path cowTargetDir = tempDir.resolve("cow");
@@ -125,7 +120,6 @@ public class ITTestVariantCrossEngineCompatibility {
   }
 
   @Test
-  @Disabled("disabled and reopen the tests for 1.3")
   public void testFlinkReadSparkVariantMORTableWithAvro() throws Exception {
     // Test that Flink can read a MOR table with AVRO record type and Variant 
data written by Spark 4.0
     Path morAvroTargetDir = tempDir.resolve("mor_avro");
@@ -135,7 +129,6 @@ public class ITTestVariantCrossEngineCompatibility {
   }
 
   @Test
-  @Disabled("disabled and reopen the tests for 1.3")
   public void testFlinkReadSparkVariantMORTableWithSpark() throws Exception {
     // Test that Flink can read a MOR table with SPARK record type and Variant 
data written by Spark 4.0
     Path morSparkTargetDir = tempDir.resolve("mor_spark");
@@ -143,4 +136,4 @@ public class ITTestVariantCrossEngineCompatibility {
     String morSparkPath = 
morSparkTargetDir.resolve("variant_mor_spark").toString();
     verifyFlinkCanReadSparkVariantTable(morSparkPath, "MERGE_ON_READ", "MOR 
table with SPARK record type");
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..ae2e4107d6ea
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+  public static void assertAsBinaryVariant(Object variantObject) {
+    throw new UnsupportedOperationException("Variant is not supported yet.");
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..ae2e4107d6ea
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+  public static void assertAsBinaryVariant(Object variantObject) {
+    throw new UnsupportedOperationException("Variant is not supported yet.");
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.19.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
 
b/hudi-flink-datasource/hudi-flink1.19.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..ae2e4107d6ea
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.19.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+  public static void assertAsBinaryVariant(Object variantObject) {
+    throw new UnsupportedOperationException("Variant is not supported yet.");
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.20.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
 
b/hudi-flink-datasource/hudi-flink1.20.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..ae2e4107d6ea
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.20.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+  public static void assertAsBinaryVariant(Object variantObject) {
+    throw new UnsupportedOperationException("Variant is not supported yet.");
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
 
b/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
index 1a256ffe7bba..aac9a4651aff 100644
--- 
a/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
+++ 
b/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
@@ -34,6 +34,9 @@ public class KryoSerializerSnapshot<T> implements 
TypeSerializerSnapshot<T> {
 
   private Class<T> type;
 
+  public KryoSerializerSnapshot() {
+  }
+
   public KryoSerializerSnapshot(Class<T> type) {
     this.type = type;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
 
b/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..ae2e4107d6ea
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+  public static void assertAsBinaryVariant(Object variantObject) {
+    throw new UnsupportedOperationException("Variant is not supported yet.");
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index bb5d0c55b81d..3e16417ba541 100644
--- 
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.format.cow;
 
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
@@ -320,6 +321,17 @@ public class ParquetSplitReaderUtil {
         } else {
           throw new UnsupportedOperationException("Unsupported create row with 
default value.");
         }
+      case VARIANT:
+        HeapRowColumnVector variantVector = new HeapRowColumnVector(
+            batchSize,
+            new HeapBytesVector(batchSize),
+            new HeapBytesVector(batchSize));
+        if (value == null) {
+          variantVector.fillWithNulls();
+          return variantVector;
+        } else {
+          throw new UnsupportedOperationException("Unsupported create variant 
with default value.");
+        }
       default:
         throw new UnsupportedOperationException("Unsupported type: " + type);
     }
@@ -498,6 +510,20 @@ public class ParquetSplitReaderUtil {
           }
         }
         return new RowColumnReader(fieldReaders);
+      case VARIANT:
+        ColumnDescriptor valueDescriptor = getVariantColumnDescriptor(
+            physicalType,
+            descriptors,
+            depth,
+            HoodieSchema.Variant.VARIANT_VALUE_FIELD);
+        ColumnDescriptor metadataDescriptor = getVariantColumnDescriptor(
+            physicalType,
+            descriptors,
+            depth,
+            HoodieSchema.Variant.VARIANT_METADATA_FIELD);
+        return new RowColumnReader(Arrays.asList(
+            new BytesColumnReader(valueDescriptor, 
pages.getPageReader(valueDescriptor)),
+            new BytesColumnReader(metadataDescriptor, 
pages.getPageReader(metadataDescriptor))));
       default:
         throw new UnsupportedOperationException(fieldType + " is not supported 
now.");
     }
@@ -675,11 +701,68 @@ public class ParquetSplitReaderUtil {
           }
         }
         return new HeapRowColumnVector(batchSize, columnVectors);
+      case VARIANT:
+        validateVariantType(physicalType);
+        return new HeapRowColumnVector(
+            batchSize,
+            new HeapBytesVector(batchSize),
+            new HeapBytesVector(batchSize));
       default:
         throw new UnsupportedOperationException(fieldType + " is not supported 
now.");
     }
   }
 
+  private static ColumnDescriptor getVariantColumnDescriptor(
+      Type physicalType,
+      List<ColumnDescriptor> descriptors,
+      int depth,
+      String fieldName) {
+    return descriptors.stream()
+        .filter(descriptor -> descriptor.getPath().length > depth + 1
+            && fieldName.equals(descriptor.getPath()[depth + 1]))
+        .findFirst()
+        .orElseThrow(() -> new IllegalArgumentException(
+            "Invalid Variant Parquet schema: missing binary field '" + 
fieldName + "'."));
+  }
+
+  private static void validateVariantType(Type physicalType) {
+    if (!physicalType.isPrimitive()) {
+      GroupType groupType = physicalType.asGroupType();
+      if (isShreddedVariant(groupType)) {
+        throw new UnsupportedOperationException(
+            "Shredded Variant is not supported in Flink. "
+                + "The Parquet group '" + groupType.getName() + "' contains a 
'"
+                + HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD
+                + "' field indicating a shredded layout.");
+      }
+      validateVariantField(groupType, 
HoodieSchema.Variant.VARIANT_VALUE_FIELD);
+      validateVariantField(groupType, 
HoodieSchema.Variant.VARIANT_METADATA_FIELD);
+    } else {
+      throw new IllegalArgumentException(
+          "Type mismatch, expected Variant but got '" + physicalType + "'.");
+    }
+  }
+
+  /**
+   * Checks whether a variant group contains a {@code typed_value} field, 
indicating a shredded
+   * layout.
+   */
+  private static boolean isShreddedVariant(GroupType groupType) {
+    return 
groupType.containsField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
+  }
+
+  private static void validateVariantField(GroupType groupType, String 
fieldName) {
+    if (groupType.containsField(fieldName)) {
+      Type fieldType = groupType.getType(fieldName);
+      if (fieldType.isPrimitive()
+          && fieldType.asPrimitiveType().getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.BINARY) {
+        return;
+      }
+    }
+    throw new IllegalArgumentException(
+        "Invalid Variant Parquet schema: missing binary field '" + fieldName + 
"'.");
+  }
+
   /**
    * Returns the field index with given physical row type {@code groupType} 
and field name {@code fieldName}.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
 
b/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
index 1a256ffe7bba..aac9a4651aff 100644
--- 
a/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
+++ 
b/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
@@ -34,6 +34,9 @@ public class KryoSerializerSnapshot<T> implements 
TypeSerializerSnapshot<T> {
 
   private Class<T> type;
 
+  public KryoSerializerSnapshot() {
+  }
+
   public KryoSerializerSnapshot(Class<T> type) {
     this.type = type;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
 
b/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..2727b40b1393
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.types.variant.BinaryVariant;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+  public static void assertAsBinaryVariant(Object variantObject) {
+    assertInstanceOf(BinaryVariant.class, variantObject, "Variant column 
should be a BinaryVariant");
+  }
+}
diff --git a/pom.xml b/pom.xml
index 15d770ede08c..909c7771e6e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,14 +153,14 @@
     <flink1.19.version>1.19.2</flink1.19.version>
     <flink1.18.version>1.18.1</flink1.18.version>
     <flink1.17.version>1.17.1</flink1.17.version>
-    <flink.version>${flink1.20.version}</flink.version>
-    <hudi.flink.module>hudi-flink1.20.x</hudi.flink.module>
-    <flink.bundle.version>1.20</flink.bundle.version>
+    <flink.version>${flink2.1.version}</flink.version>
+    <hudi.flink.module>hudi-flink2.1.x</hudi.flink.module>
+    <flink.bundle.version>2.1</flink.bundle.version>
     <!-- This is fixed to match with version from flink-avro -->
     <flink.avro.version>1.11.4</flink.avro.version>
-    <flink.format.parquet.version>1.13.1</flink.format.parquet.version>
+    <flink.format.parquet.version>1.15.2</flink.format.parquet.version>
     <!-- check kafka version -->
-    <flink.connector.kafka.version>3.3.0-1.20</flink.connector.kafka.version>
+    <flink.connector.kafka.version>4.0.1-2.0</flink.connector.kafka.version>
     <flink.runtime.artifactId>flink-runtime</flink.runtime.artifactId>
     
<flink.table.runtime.artifactId>flink-table-runtime</flink.table.runtime.artifactId>
     
<flink.table.planner.artifactId>flink-table-planner_2.12</flink.table.planner.artifactId>
@@ -170,7 +170,7 @@
     
<flink.streaming.java.artifactId>flink-streaming-java</flink.streaming.java.artifactId>
     <flink.clients.artifactId>flink-clients</flink.clients.artifactId>
     
<flink.connector.kafka.artifactId>flink-connector-kafka</flink.connector.kafka.artifactId>
-    
<flink.hadoop.compatibility.artifactId>flink-hadoop-compatibility_2.12</flink.hadoop.compatibility.artifactId>
+    
<flink.hadoop.compatibility.artifactId>flink-hadoop-compatibility</flink.hadoop.compatibility.artifactId>
     <rocksdbjni.version>7.5.3</rocksdbjni.version>
     <spark33.version>3.3.4</spark33.version>
     <spark34.version>3.4.3</spark34.version>
@@ -2900,6 +2900,7 @@
         <module>hudi-flink-datasource/hudi-flink2.1.x</module>
       </modules>
       <activation>
+        <activeByDefault>true</activeByDefault>
         <property>
           <name>flink2.1</name>
         </property>
@@ -2943,7 +2944,6 @@
         <module>hudi-flink-datasource/hudi-flink1.20.x</module>
       </modules>
       <activation>
-        <activeByDefault>true</activeByDefault>
         <property>
           <name>flink1.20</name>
         </property>


Reply via email to