This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b6bc1658e8f2 chore: migrate the flink ITs run to flink2.1 (#18717)
b6bc1658e8f2 is described below
commit b6bc1658e8f2862583c22425eaa0665ae100f5a3
Author: Danny Chan <[email protected]>
AuthorDate: Tue May 12 17:09:39 2026 +0800
chore: migrate the flink ITs run to flink2.1 (#18717)
* chore: migrate the flink ITs to run with flink2.1
* fix compile errors
* fix test failures
* fix test failure
* address review comments
---
.github/workflows/bot.yml | 30 ++++----
azure-pipelines-20230430.yml | 4 +-
.../hudi/util/TestHoodieSchemaConverter.java | 23 +++---
.../hudi/sink/bootstrap/RLIBootstrapOperator.java | 27 ++++---
.../sink/bootstrap/TestRLIBootstrapOperator.java | 80 +++++++++++++++++++++
.../ITTestVariantCrossEngineCompatibility.java | 31 ++++----
.../hudi/adapter/DataTypeAdapterTestUtils.java | 28 ++++++++
.../hudi/adapter/DataTypeAdapterTestUtils.java | 28 ++++++++
.../hudi/adapter/DataTypeAdapterTestUtils.java | 28 ++++++++
.../hudi/adapter/DataTypeAdapterTestUtils.java | 28 ++++++++
.../runtime/kryo/KryoSerializerSnapshot.java | 3 +
.../hudi/adapter/DataTypeAdapterTestUtils.java | 28 ++++++++
.../table/format/cow/ParquetSplitReaderUtil.java | 83 ++++++++++++++++++++++
.../runtime/kryo/KryoSerializerSnapshot.java | 3 +
.../hudi/adapter/DataTypeAdapterTestUtils.java | 32 +++++++++
pom.xml | 14 ++--
16 files changed, 404 insertions(+), 66 deletions(-)
diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 5fb047f174d4..2143edd92bcb 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -67,7 +67,7 @@ jobs:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
- flinkProfile: "flink1.18"
+ flinkProfile: "flink2.1"
steps:
- uses: actions/checkout@v5
@@ -126,7 +126,7 @@ jobs:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
- flinkProfile: "flink1.18"
+ flinkProfile: "flink2.1"
steps:
- uses: actions/checkout@v5
@@ -178,7 +178,7 @@ jobs:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
- flinkProfile: "flink1.18"
+ flinkProfile: "flink2.1"
env:
UT_MODULES: >-
@@ -563,7 +563,7 @@ jobs:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
- flinkProfile: "flink1.20"
+ flinkProfile: "flink2.1"
steps:
- uses: actions/checkout@v5
@@ -938,7 +938,7 @@ jobs:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
FLINK_AVRO_VERSION: ${{ matrix.flinkAvroVersion }}
FLINK_PARQUET_VERSION: ${{ matrix.flinkParquetVersion }}
- if: ${{ endsWith(env.FLINK_PROFILE, '1.20') }}
+ if: ${{ endsWith(env.FLINK_PROFILE, '2.1') }}
run: |
mvn clean install -T 2 -Pintegration-tests -D"$SCALA_PROFILE"
-D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink -am
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION"
-DskipTests=true $MVN_ARGS
mvn verify -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE"
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION"
$FLINK_IT_FILTER1 -pl hudi-flink-datasource/hudi-flink $MVN_ARGS
@@ -948,9 +948,9 @@ jobs:
strategy:
matrix:
include:
- - flinkProfile: "flink1.20"
+ - flinkProfile: "flink2.1"
flinkAvroVersion: "1.11.4"
- flinkParquetVersion: '1.13.1'
+ flinkParquetVersion: '1.15.2'
steps:
- uses: actions/checkout@v5
- name: Set up JDK 11
@@ -974,7 +974,7 @@ jobs:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
FLINK_AVRO_VERSION: ${{ matrix.flinkAvroVersion }}
FLINK_PARQUET_VERSION: ${{ matrix.flinkParquetVersion }}
- if: ${{ endsWith(env.FLINK_PROFILE, '1.20') }}
+ if: ${{ endsWith(env.FLINK_PROFILE, '2.1') }}
run: |
mvn clean install -T 2 -Pintegration-tests -D"$SCALA_PROFILE"
-D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink -am
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION"
-DskipTests=true $MVN_ARGS
mvn verify -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE"
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION"
$FLINK_IT_FILTER2 -pl hudi-flink-datasource/hudi-flink $MVN_ARGS
@@ -985,19 +985,19 @@ jobs:
matrix:
include:
- scalaProfile: 'scala-2.13'
- flinkProfile: 'flink1.20'
+ flinkProfile: 'flink2.1'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
- scalaProfile: 'scala-2.12'
- flinkProfile: 'flink1.20'
+ flinkProfile: 'flink2.1'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
- scalaProfile: 'scala-2.13'
- flinkProfile: 'flink1.20'
+ flinkProfile: 'flink2.1'
sparkProfile: 'spark4.0'
sparkRuntime: 'spark4.0.0'
- scalaProfile: 'scala-2.13'
- flinkProfile: 'flink1.20'
+ flinkProfile: 'flink2.1'
sparkProfile: 'spark4.1'
sparkRuntime: 'spark4.1.1'
@@ -1201,7 +1201,7 @@ jobs:
matrix:
include:
- sparkProfile: 'spark3.5'
- flinkProfile: 'flink1.20'
+ flinkProfile: 'flink2.1'
sparkArchive: 'spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz'
steps:
- uses: actions/checkout@v5
@@ -1297,9 +1297,9 @@ jobs:
matrix:
include:
- scalaProfile: "scala-2.12"
- flinkProfile: "flink1.20"
+ flinkProfile: "flink2.1"
flinkAvroVersion: '1.11.4'
- flinkParquetVersion: '1.13.1'
+ flinkParquetVersion: '1.15.2'
steps:
- uses: actions/checkout@v5
- name: Set up JDK 17
diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml
index 4b1253a30963..cccd89ec6cc7 100644
--- a/azure-pipelines-20230430.yml
+++ b/azure-pipelines-20230430.yml
@@ -14,7 +14,7 @@
# limitations under the License.
# NOTE:
-# This config file defines how Azure CI runs tests with Spark 3.5 and Flink
1.18 profiles.
+# This config file defines how Azure CI runs tests with Spark 3.5 and Flink
2.1 profiles.
# PRs will need to keep in sync with master's version to trigger the CI runs.
# See scripts/jacoco/README.md for how aggregated code coverage report works
# across multiple modules.
@@ -100,7 +100,7 @@ parameters:
- '!packaging/hudi-utilities-slim-bundle'
variables:
- BUILD_PROFILES: '-Dscala-2.12 -Dspark3.5 -Dflink1.18'
+ BUILD_PROFILES: '-Dscala-2.12 -Dspark3.5 -Dflink2.1'
PLUGIN_OPTS: '-Dcheckstyle.skip=true -Drat.skip=true -ntp -B -V -Pwarn-log
-Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn
-Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn'
MVN_OPTS_INSTALL: '-T 3 -Phudi-platform-service -DskipTests
$(BUILD_PROFILES) $(PLUGIN_OPTS)
-Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=5 -Dorg.eclipse.jetty.LEVEL=WARN'
MVN_OPTS_TEST: '-fae -Pwarn-log $(BUILD_PROFILES) $(PLUGIN_OPTS)'
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index 51b8a15f4c7b..9a4f920b859e 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -36,11 +36,12 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.VarBinaryType;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -693,24 +694,18 @@ public class TestHoodieSchemaConverter {
}
@Test
- @Disabled("disabled and reopen the tests for 1.3")
public void testVariantTypeConversion() {
// Test direct Variant conversion
HoodieSchema variantSchema = HoodieSchema.createVariant();
DataType dataType = HoodieSchemaConverter.convertToDataType(variantSchema);
assertNotNull(dataType);
- // Verify it's a ROW with metadata and value binary fields
- RowType rowType = (RowType) dataType.getLogicalType();
- assertEquals(2, rowType.getFieldCount());
- assertEquals("metadata", rowType.getFieldNames().get(0));
- assertEquals("value", rowType.getFieldNames().get(1));
- assertInstanceOf(VarBinaryType.class, rowType.getTypeAt(0));
- assertInstanceOf(VarBinaryType.class, rowType.getTypeAt(1));
+ // Verify it's a Variant
+ assertThat("the return type should be variant",
+ dataType.getLogicalType().asSummaryString(), is("VARIANT NOT NULL"));
}
@Test
- @Disabled("disabled and reopen the tests for 1.3")
public void testVariantInRecordConversion() {
// Test Variant field within a record
HoodieSchema recordWithVariant = HoodieSchema.createRecord(
@@ -727,11 +722,9 @@ public class TestHoodieSchemaConverter {
assertEquals(2, result.getFieldCount());
assertEquals("data", result.getFieldNames().get(1));
- // Verify variant field is a ROW<metadata BYTES, value BYTES>
- RowType variantRowType = (RowType) result.getTypeAt(1);
- assertEquals(2, variantRowType.getFieldCount());
- assertEquals("metadata", variantRowType.getFieldNames().get(0));
- assertEquals("value", variantRowType.getFieldNames().get(1));
+ // Verify variant field
+ assertThat("the return type should be variant",
+ result.getTypeAt(1).asSummaryString(), is("VARIANT NOT NULL"));
}
@Test
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
index 515b180d3cad..d0cb16a789e5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
@@ -52,7 +53,7 @@ import java.util.stream.Collectors;
public class RLIBootstrapOperator
extends AbstractBootstrapOperator {
- private transient HoodieBackedTableMetadata metadataTable;
+ private transient HoodieBackedTableMetadata tableMetadata;
private transient long loadedCnt;
public RLIBootstrapOperator(Configuration conf) {
@@ -63,13 +64,13 @@ public class RLIBootstrapOperator
public void initializeState(StateInitializationContext context) throws
Exception {
loadedCnt = 0;
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
- this.metadataTable = (HoodieBackedTableMetadata)
metaClient.getTableFormat().getMetadataFactory().create(
+ this.tableMetadata = new HoodieBackedTableMetadata(
HoodieFlinkEngineContext.DEFAULT,
metaClient.getStorage(),
StreamerUtil.metadataConfig(conf),
conf.get(FlinkOptions.PATH));
// Load RLI records
- preLoadRLIRecords();
+ preLoadRLIRecords(metaClient.getTableConfig());
}
@Override
@@ -82,10 +83,20 @@ public class RLIBootstrapOperator
// Utilities
// -------------------------------------------------------------------------
- private void preLoadRLIRecords() {
+ private void preLoadRLIRecords(HoodieTableConfig tableConfig) {
int taskID =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
int parallelism =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+ if (!tableMetadata.enabled()) {
+ if (tableConfig.isMetadataTableAvailable()) {
+ throw new RuntimeException("Can not initialize the table metadata");
+ }
+ log.info("Skip loading RLI records because table metadata is not
initialized, taskId = {}", taskID);
+ waitForBootstrapReady(taskID);
+ closeMetadataTable();
+ return;
+ }
+
log.info("Start loading RLI records from metadata table, taskId = {},
parallelism = {}", taskID, parallelism);
SerializableFunctionUnchecked<List<FileSlice>, List<FileSlice>>
fileSlicesFilter = fileSlices -> {
@@ -102,7 +113,7 @@ public class RLIBootstrapOperator
// Each subtask loads buckets assigned to it
long startTime = System.currentTimeMillis();
- HoodiePairData<String, HoodieRecordGlobalLocation> rliData =
metadataTable.readRecordIndexLocations(fileSlicesFilter);
+ HoodiePairData<String, HoodieRecordGlobalLocation> rliData =
tableMetadata.readRecordIndexLocations(fileSlicesFilter);
rliData.forEach(locationPair -> emitIndexRecord(locationPair.getLeft(),
locationPair.getRight()));
long costMs = System.currentTimeMillis() - startTime;
log.info("Finish loading RLI records, total records: {}, cost: {} ms,
taskId = {}", loadedCnt, costMs, taskID);
@@ -133,13 +144,13 @@ public class RLIBootstrapOperator
}
private void closeMetadataTable() {
- if (metadataTable != null) {
+ if (tableMetadata != null) {
try {
- metadataTable.close();
+ tableMetadata.close();
} catch (Exception e) {
log.warn("Failed to close metadata table", e);
}
- metadataTable = null;
+ tableMetadata = null;
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bootstrap/TestRLIBootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bootstrap/TestRLIBootstrapOperator.java
new file mode 100644
index 000000000000..2e684d0652ae
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bootstrap/TestRLIBootstrapOperator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link RLIBootstrapOperator}.
+ */
+public class TestRLIBootstrapOperator {
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ void testSkipPreloadForFreshTableWithoutMetadataTable() throws Exception {
+ Configuration conf = getRLIConf();
+ StreamerUtil.initTableIfNotExists(conf);
+
+ try (OneInputStreamOperatorTestHarness<HoodieFlinkInternalRow,
HoodieFlinkInternalRow> harness =
+ new OneInputStreamOperatorTestHarness<>(new
RLIBootstrapOperator(conf), 1, 1, 0)) {
+ harness.open();
+
+ assertEquals(0, harness.getOutput().size());
+ }
+ }
+
+ @Test
+ void testFailFastWhenMetadataTableIsMarkedAvailableButCannotBeLoaded()
throws Exception {
+ Configuration conf = getRLIConf();
+ HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf);
+ metaClient.getTableConfig().setMetadataPartitionState(metaClient,
MetadataPartitionType.FILES.getPartitionPath(), true);
+
+ try (OneInputStreamOperatorTestHarness<HoodieFlinkInternalRow,
HoodieFlinkInternalRow> harness =
+ new OneInputStreamOperatorTestHarness<>(new
RLIBootstrapOperator(conf), 1, 1, 0)) {
+ RuntimeException error = assertThrows(RuntimeException.class,
harness::open);
+
+ assertEquals("Can not initialize the table metadata",
error.getMessage());
+ }
+ }
+
+ private Configuration getRLIConf() {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+ return conf;
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
index 03305d478ecd..de6e3835d120 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
@@ -18,6 +18,8 @@
package org.apache.hudi.table;
+import org.apache.hudi.adapter.DataTypeAdapter;
+import org.apache.hudi.adapter.DataTypeAdapterTestUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utils.FlinkMiniCluster;
@@ -27,7 +29,6 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
@@ -51,19 +52,16 @@ public class ITTestVariantCrossEngineCompatibility {
/**
* Helper method to verify that Flink can read Spark 4.0 Variant tables.
- * Variant data is represented as ROW<metadata BYTES, value BYTES> in Flink.
*/
private void verifyFlinkCanReadSparkVariantTable(String tablePath, String
tableType, String testDescription) throws Exception {
TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
// Create a Hudi table pointing to the Spark-written data
- // In Flink, Variant is represented as ROW<metadata BYTES, value BYTES>
- // NOTE: value is a reserved keyword
String createTableDdl = String.format(
"CREATE TABLE variant_table ("
+ " id INT,"
+ " name STRING,"
- + " v ROW<metadata BYTES, `value` BYTES>,"
+ + " v VARIANT,"
+ " ts BIGINT,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
@@ -87,35 +85,32 @@ public class ITTestVariantCrossEngineCompatibility {
assertEquals("row1", row.getField(1), "Second column should be name=row1");
assertEquals(1000L, row.getField(3), "Fourth column should be ts=1000");
- // Verify the variant column is readable as a ROW with binary fields
- Row variantRow = (Row) row.getField(2);
- assertNotNull(variantRow, "Variant column should not be null");
-
- byte[] metadataBytes = (byte[]) variantRow.getField(0);
- byte[] valueBytes = (byte[]) variantRow.getField(1);
+ // Verify the variant column is readable as a native Flink Variant.
+ Object variantObject = row.getField(2);
+ assertNotNull(variantObject, "Variant column should not be null");
+ DataTypeAdapterTestUtils.assertAsBinaryVariant(variantObject);
// Expected byte values from Spark 4.0 Variant representation: {"updated":
true, "new_field": 123}
byte[] expectedValueBytes = new byte[]{0x02, 0x02, 0x01, 0x00, 0x01, 0x00,
0x03, 0x04, 0x0C, 0x7B};
byte[] expectedMetadataBytes = new byte[]{0x01, 0x02, 0x00, 0x07, 0x10,
0x75, 0x70, 0x64, 0x61,
0x74, 0x65, 0x64, 0x6E, 0x65, 0x77, 0x5F, 0x66, 0x69, 0x65, 0x6C,
0x64};
- assertArrayEquals(expectedValueBytes, valueBytes,
+ assertArrayEquals(expectedValueBytes,
DataTypeAdapter.getVariantValue(variantObject),
String.format("Variant value bytes mismatch (%s). Expected: %s, Got:
%s",
testDescription,
Arrays.toString(StringUtils.encodeHex(expectedValueBytes)),
- Arrays.toString(StringUtils.encodeHex(valueBytes))));
+
Arrays.toString(StringUtils.encodeHex(DataTypeAdapter.getVariantValue(variantObject)))));
- assertArrayEquals(expectedMetadataBytes, metadataBytes,
+ assertArrayEquals(expectedMetadataBytes,
DataTypeAdapter.getVariantMetadata(variantObject),
String.format("Variant metadata bytes mismatch (%s). Expected: %s,
Got: %s",
testDescription,
Arrays.toString(StringUtils.encodeHex(expectedMetadataBytes)),
- Arrays.toString(StringUtils.encodeHex(metadataBytes))));
+
Arrays.toString(StringUtils.encodeHex(DataTypeAdapter.getVariantMetadata(variantObject)))));
tableEnv.executeSql("DROP TABLE variant_table");
}
@Test
- @Disabled("disabled and reopen the tests for 1.3")
public void testFlinkReadSparkVariantCOWTable() throws Exception {
// Test that Flink can read a COW table with Variant data written by Spark
4.0
Path cowTargetDir = tempDir.resolve("cow");
@@ -125,7 +120,6 @@ public class ITTestVariantCrossEngineCompatibility {
}
@Test
- @Disabled("disabled and reopen the tests for 1.3")
public void testFlinkReadSparkVariantMORTableWithAvro() throws Exception {
// Test that Flink can read a MOR table with AVRO record type and Variant
data written by Spark 4.0
Path morAvroTargetDir = tempDir.resolve("mor_avro");
@@ -135,7 +129,6 @@ public class ITTestVariantCrossEngineCompatibility {
}
@Test
- @Disabled("disabled and reopen the tests for 1.3")
public void testFlinkReadSparkVariantMORTableWithSpark() throws Exception {
// Test that Flink can read a MOR table with SPARK record type and Variant
data written by Spark 4.0
Path morSparkTargetDir = tempDir.resolve("mor_spark");
@@ -143,4 +136,4 @@ public class ITTestVariantCrossEngineCompatibility {
String morSparkPath =
morSparkTargetDir.resolve("variant_mor_spark").toString();
verifyFlinkCanReadSparkVariantTable(morSparkPath, "MERGE_ON_READ", "MOR
table with SPARK record type");
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..ae2e4107d6ea
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+ public static void assertAsBinaryVariant(Object variantObject) {
+ throw new UnsupportedOperationException("Variant is not supported yet.");
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..ae2e4107d6ea
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+ public static void assertAsBinaryVariant(Object variantObject) {
+ throw new UnsupportedOperationException("Variant is not supported yet.");
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.19.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
b/hudi-flink-datasource/hudi-flink1.19.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..ae2e4107d6ea
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.19.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+ public static void assertAsBinaryVariant(Object variantObject) {
+ throw new UnsupportedOperationException("Variant is not supported yet.");
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.20.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
b/hudi-flink-datasource/hudi-flink1.20.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..ae2e4107d6ea
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.20.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+ public static void assertAsBinaryVariant(Object variantObject) {
+ throw new UnsupportedOperationException("Variant is not supported yet.");
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
b/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
index 1a256ffe7bba..aac9a4651aff 100644
---
a/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
+++
b/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
@@ -34,6 +34,9 @@ public class KryoSerializerSnapshot<T> implements
TypeSerializerSnapshot<T> {
private Class<T> type;
+ public KryoSerializerSnapshot() {
+ }
+
public KryoSerializerSnapshot(Class<T> type) {
this.type = type;
}
diff --git
a/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
b/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..ae2e4107d6ea
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink2.0.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+ public static void assertAsBinaryVariant(Object variantObject) {
+ throw new UnsupportedOperationException("Variant is not supported yet.");
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index bb5d0c55b81d..3e16417ba541 100644
---
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.format.cow;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
@@ -320,6 +321,17 @@ public class ParquetSplitReaderUtil {
} else {
throw new UnsupportedOperationException("Unsupported create row with
default value.");
}
+ case VARIANT:
+ HeapRowColumnVector variantVector = new HeapRowColumnVector(
+ batchSize,
+ new HeapBytesVector(batchSize),
+ new HeapBytesVector(batchSize));
+ if (value == null) {
+ variantVector.fillWithNulls();
+ return variantVector;
+ } else {
+ throw new UnsupportedOperationException("Unsupported create variant
with default value.");
+ }
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
@@ -498,6 +510,20 @@ public class ParquetSplitReaderUtil {
}
}
return new RowColumnReader(fieldReaders);
+ case VARIANT:
+ ColumnDescriptor valueDescriptor = getVariantColumnDescriptor(
+ physicalType,
+ descriptors,
+ depth,
+ HoodieSchema.Variant.VARIANT_VALUE_FIELD);
+ ColumnDescriptor metadataDescriptor = getVariantColumnDescriptor(
+ physicalType,
+ descriptors,
+ depth,
+ HoodieSchema.Variant.VARIANT_METADATA_FIELD);
+ return new RowColumnReader(Arrays.asList(
+ new BytesColumnReader(valueDescriptor,
pages.getPageReader(valueDescriptor)),
+ new BytesColumnReader(metadataDescriptor,
pages.getPageReader(metadataDescriptor))));
default:
throw new UnsupportedOperationException(fieldType + " is not supported
now.");
}
@@ -675,11 +701,68 @@ public class ParquetSplitReaderUtil {
}
}
return new HeapRowColumnVector(batchSize, columnVectors);
+ case VARIANT:
+ validateVariantType(physicalType);
+ return new HeapRowColumnVector(
+ batchSize,
+ new HeapBytesVector(batchSize),
+ new HeapBytesVector(batchSize));
default:
throw new UnsupportedOperationException(fieldType + " is not supported
now.");
}
}
+ private static ColumnDescriptor getVariantColumnDescriptor(
+ Type physicalType,
+ List<ColumnDescriptor> descriptors,
+ int depth,
+ String fieldName) {
+ return descriptors.stream()
+ .filter(descriptor -> descriptor.getPath().length > depth + 1
+ && fieldName.equals(descriptor.getPath()[depth + 1]))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException(
+ "Invalid Variant Parquet schema: missing binary field '" +
fieldName + "'."));
+ }
+
+ private static void validateVariantType(Type physicalType) {
+ if (!physicalType.isPrimitive()) {
+ GroupType groupType = physicalType.asGroupType();
+ if (isShreddedVariant(groupType)) {
+ throw new UnsupportedOperationException(
+ "Shredded Variant is not supported in Flink. "
+ + "The Parquet group '" + groupType.getName() + "' contains a
'"
+ + HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD
+ + "' field indicating a shredded layout.");
+ }
+ validateVariantField(groupType,
HoodieSchema.Variant.VARIANT_VALUE_FIELD);
+ validateVariantField(groupType,
HoodieSchema.Variant.VARIANT_METADATA_FIELD);
+ } else {
+ throw new IllegalArgumentException(
+ "Type mismatch, expected Variant but got '" + physicalType + "'.");
+ }
+ }
+
+ /**
+ * Checks whether a variant group contains a {@code typed_value} field,
indicating a shredded
+ * layout.
+ */
+ private static boolean isShreddedVariant(GroupType groupType) {
+ return
groupType.containsField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
+ }
+
+ private static void validateVariantField(GroupType groupType, String
fieldName) {
+ if (groupType.containsField(fieldName)) {
+ Type fieldType = groupType.getType(fieldName);
+ if (fieldType.isPrimitive()
+ && fieldType.asPrimitiveType().getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.BINARY) {
+ return;
+ }
+ }
+ throw new IllegalArgumentException(
+ "Invalid Variant Parquet schema: missing binary field '" + fieldName +
"'.");
+ }
+
/**
* Returns the field index with given physical row type {@code groupType}
and field name {@code fieldName}.
*
diff --git
a/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
b/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
index 1a256ffe7bba..aac9a4651aff 100644
---
a/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
+++
b/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
@@ -34,6 +34,9 @@ public class KryoSerializerSnapshot<T> implements
TypeSerializerSnapshot<T> {
private Class<T> type;
+ public KryoSerializerSnapshot() {
+ }
+
public KryoSerializerSnapshot(Class<T> type) {
this.type = type;
}
diff --git
a/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
b/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
new file mode 100644
index 000000000000..2727b40b1393
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink2.1.x/src/test/java/org/apache/hudi/adapter/DataTypeAdapterTestUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.types.variant.BinaryVariant;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+/**
+ * Adapter utils.
+ */
+public class DataTypeAdapterTestUtils {
+ public static void assertAsBinaryVariant(Object variantObject) {
+ assertInstanceOf(BinaryVariant.class, variantObject, "Variant column
should be a BinaryVariant");
+ }
+}
diff --git a/pom.xml b/pom.xml
index 15d770ede08c..909c7771e6e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,14 +153,14 @@
<flink1.19.version>1.19.2</flink1.19.version>
<flink1.18.version>1.18.1</flink1.18.version>
<flink1.17.version>1.17.1</flink1.17.version>
- <flink.version>${flink1.20.version}</flink.version>
- <hudi.flink.module>hudi-flink1.20.x</hudi.flink.module>
- <flink.bundle.version>1.20</flink.bundle.version>
+ <flink.version>${flink2.1.version}</flink.version>
+ <hudi.flink.module>hudi-flink2.1.x</hudi.flink.module>
+ <flink.bundle.version>2.1</flink.bundle.version>
<!-- This is fixed to match with version from flink-avro -->
<flink.avro.version>1.11.4</flink.avro.version>
- <flink.format.parquet.version>1.13.1</flink.format.parquet.version>
+ <flink.format.parquet.version>1.15.2</flink.format.parquet.version>
<!-- check kafka version -->
- <flink.connector.kafka.version>3.3.0-1.20</flink.connector.kafka.version>
+ <flink.connector.kafka.version>4.0.1-2.0</flink.connector.kafka.version>
<flink.runtime.artifactId>flink-runtime</flink.runtime.artifactId>
<flink.table.runtime.artifactId>flink-table-runtime</flink.table.runtime.artifactId>
<flink.table.planner.artifactId>flink-table-planner_2.12</flink.table.planner.artifactId>
@@ -170,7 +170,7 @@
<flink.streaming.java.artifactId>flink-streaming-java</flink.streaming.java.artifactId>
<flink.clients.artifactId>flink-clients</flink.clients.artifactId>
<flink.connector.kafka.artifactId>flink-connector-kafka</flink.connector.kafka.artifactId>
-
<flink.hadoop.compatibility.artifactId>flink-hadoop-compatibility_2.12</flink.hadoop.compatibility.artifactId>
+
<flink.hadoop.compatibility.artifactId>flink-hadoop-compatibility</flink.hadoop.compatibility.artifactId>
<rocksdbjni.version>7.5.3</rocksdbjni.version>
<spark33.version>3.3.4</spark33.version>
<spark34.version>3.4.3</spark34.version>
@@ -2900,6 +2900,7 @@
<module>hudi-flink-datasource/hudi-flink2.1.x</module>
</modules>
<activation>
+ <activeByDefault>true</activeByDefault>
<property>
<name>flink2.1</name>
</property>
@@ -2943,7 +2944,6 @@
<module>hudi-flink-datasource/hudi-flink1.20.x</module>
</modules>
<activation>
- <activeByDefault>true</activeByDefault>
<property>
<name>flink1.20</name>
</property>