This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 55566a88515f7e91423f556e9f26e3ca9c9f5315 Author: Shuo Cheng <[email protected]> AuthorDate: Mon May 11 13:54:16 2026 +0800 fix(flink): Support read non-VECTOR columns from table containing VEC… (#18712) * fix(flink): Support read non-VECTOR columns from table containing VECTOR columns --- .../apache/hudi/util/HoodieSchemaConverter.java | 23 +++++ .../hudi/util/TestHoodieSchemaConverter.java | 54 +++++++++++ .../vector_cross_engine_validation/README.md | 66 ++++++++++++++ .../vector_cross_engine_validation/vector_cow.zip | Bin 0 -> 80614 bytes .../vector_cross_engine_validation/vector_mor.zip | Bin 0 -> 77871 bytes .../ITTestVectorCrossEngineCompatibility.java | 101 +++++++++++++++++++++ 6 files changed, 244 insertions(+) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java index 0bfed0e43cec..0b3a1af7982b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java @@ -413,6 +413,8 @@ public class HoodieSchemaConverter { return convertUnion(hoodieSchema); case VARIANT: return convertVariant(hoodieSchema); + case VECTOR: + return convertVector(hoodieSchema); default: throw new IllegalArgumentException("Unsupported HoodieSchemaType: " + type); } @@ -469,6 +471,27 @@ public class HoodieSchemaConverter { return DataTypes.TIME(flinkPrecision).notNull(); } + private static DataType convertVector(HoodieSchema schema) { + if (!(schema instanceof HoodieSchema.Vector)) { + throw new IllegalStateException("Expected HoodieSchema.Vector but got: " + schema.getClass()); + } + HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema; + return DataTypes.ARRAY(convertVectorElementType(vectorSchema.getVectorElementType())).notNull(); + } + + private static DataType convertVectorElementType(HoodieSchema.Vector.VectorElementType elementType) { + switch (elementType) { + case FLOAT: + return DataTypes.FLOAT().notNull(); + case DOUBLE: + return DataTypes.DOUBLE().notNull(); + case INT8: + return DataTypes.TINYINT().notNull(); + default: + throw new IllegalArgumentException("Unsupported VECTOR element type: " + elementType); + } + } + private static DataType createBlob() { // Create nested reference ROW type DataType referenceType = DataTypes.ROW( diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java index fe60bd4a7788..e2fd16d7112b 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java @@ -30,6 +30,7 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; @@ -40,6 +41,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -475,6 +477,58 @@ public class TestHoodieSchemaConverter { assertTrue(dataType.getLogicalType() instanceof VarBinaryType); } + @Test + public void testVectorConversion() { + HoodieSchema floatVectorSchema = HoodieSchema.createVector(128); + HoodieSchema doubleVectorSchema = HoodieSchema.createVector(128, HoodieSchema.Vector.VectorElementType.DOUBLE); + HoodieSchema int8VectorSchema = HoodieSchema.createVector(128, HoodieSchema.Vector.VectorElementType.INT8); + + DataType floatDataType = HoodieSchemaConverter.convertToDataType(floatVectorSchema); + DataType doubleDataType = HoodieSchemaConverter.convertToDataType(doubleVectorSchema); + DataType int8DataType = HoodieSchemaConverter.convertToDataType(int8VectorSchema); + + assertVectorArray(floatDataType, LogicalTypeRoot.FLOAT, false); + assertVectorArray(doubleDataType, LogicalTypeRoot.DOUBLE, false); + assertVectorArray(int8DataType, LogicalTypeRoot.TINYINT, false); + } + + @Test + public void testNullableVectorConversion() { + HoodieSchema vectorSchema = HoodieSchema.createNullable(HoodieSchema.createVector(128)); + + DataType dataType = HoodieSchemaConverter.convertToDataType(vectorSchema); + + assertVectorArray(dataType, LogicalTypeRoot.FLOAT, true); + } + + @Test + public void testVectorInRecordConversion() { + HoodieSchema schema = HoodieSchema.createRecord( + "test_record", + null, + null, + Arrays.asList( + HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)), + HoodieSchemaField.of("embedding", HoodieSchema.createVector(128)) + ) + ); + + RowType rowType = HoodieSchemaConverter.convertToRowType(schema); + + assertEquals(2, rowType.getFieldCount()); + assertEquals("embedding", rowType.getFieldNames().get(1)); + ArrayType vectorArrayType = assertInstanceOf(ArrayType.class, rowType.getTypeAt(1)); + assertEquals(LogicalTypeRoot.FLOAT, vectorArrayType.getElementType().getTypeRoot()); + assertFalse(rowType.getTypeAt(1).isNullable()); + } + + private void assertVectorArray(DataType dataType, LogicalTypeRoot elementTypeRoot, boolean nullable) { + ArrayType arrayType = assertInstanceOf(ArrayType.class, dataType.getLogicalType()); + assertEquals(elementTypeRoot, arrayType.getElementType().getTypeRoot()); + assertFalse(arrayType.getElementType().isNullable()); + assertEquals(nullable, dataType.getLogicalType().isNullable()); + } + @Test void testUnionSchemaWithMultipleRecordTypes() { HoodieSchema schema = HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$); diff --git a/hudi-common/src/test/resources/vector_cross_engine_validation/README.md b/hudi-common/src/test/resources/vector_cross_engine_validation/README.md new file mode 100644 index 000000000000..1d6472fa0bf1 --- /dev/null +++ b/hudi-common/src/test/resources/vector_cross_engine_validation/README.md @@ -0,0 +1,66 @@ + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Generation of test files + +Spark3.5 is used to generate these test files. + +Table schema: +* MOR table +```sql +CREATE TABLE vector_table_mor ( + id BIGINT, + name STRING, + embedding1 VECTOR(128) COMMENT 'document float embedding', + embedding2 VECTOR(128, DOUBLE) COMMENT 'document double embedding', + embedding3 VECTOR(128, INT8) COMMENT 'document INT8 embedding', + ts BIGINT +) USING hudi +LOCATION '/tmp/hudi_vector_table_mor' +TBLPROPERTIES ( + primaryKey = 'id', + preCombineField = 'ts', + type = 'mor', + hoodie.index.type = 'INMEMORY' +); +``` + +* COW table +```sql +CREATE TABLE vector_table_cow ( + id BIGINT, + name STRING, + embedding1 VECTOR(128) COMMENT 'document float embedding', + embedding2 VECTOR(128, DOUBLE) COMMENT 'document double embedding', + embedding3 VECTOR(128, INT8) COMMENT 'document INT8 embedding', + ts BIGINT +) USING hudi +LOCATION '/tmp/hudi_vector_table_mor' +TBLPROPERTIES ( + primaryKey = 'id', + preCombineField = 'ts', + type = 'cow' +); +``` + +The shell commands used to generate this are: + +```shell +cd /path/to/test/files/ +zip -r $TABLE_DIR_NAME.zip $TABLE_DIR_NAME +``` diff --git a/hudi-common/src/test/resources/vector_cross_engine_validation/vector_cow.zip b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_cow.zip new file mode 100644 index 000000000000..a1afed0ea6e1 Binary files /dev/null and b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_cow.zip differ diff --git a/hudi-common/src/test/resources/vector_cross_engine_validation/vector_mor.zip b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_mor.zip new file mode 100644 index 000000000000..b89c2474a5e2 Binary files /dev/null and b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_mor.zip differ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorCrossEngineCompatibility.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorCrossEngineCompatibility.java new file mode 100644 index 000000000000..d45b44918014 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorCrossEngineCompatibility.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.utils.FlinkMiniCluster; +import org.apache.hudi.utils.TestTableEnvs; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.nio.file.Path; +import java.util.List; + +import static org.apache.hudi.utils.TestData.assertRowsEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Integration test for cross-engine compatibility - verifying that Flink can read tables with VECTOR columns written by Spark. + */ +@ExtendWith(FlinkMiniCluster.class) +public class ITTestVectorCrossEngineCompatibility { + @TempDir + Path tempDir; + + private void createTable(TableEnvironment tableEnv, String tablePath, String tableType) throws Exception { + // Create a Hudi table pointing to the Spark-written data + // In Flink, VECTOR is represented as ARRAY<FLOAT/DOUBLE/TINYINT> + String createTableDdl = String.format( + "CREATE TABLE vector_table (\n" + + " id BIGINT,\n" + + " name STRING,\n" + + " embedding1 ARRAY<FLOAT>,\n" + + " embedding2 ARRAY<DOUBLE>,\n" + + " embedding3 ARRAY<TINYINT>,\n" + + " ts BIGINT,\n" + + " PRIMARY KEY (id) NOT ENFORCED\n" + + ") WITH (\n" + + " 'connector' = 'hudi',\n" + + " 'path' = '%s',\n" + + " 'table.type' = '%s',\n" + + " 'ordering.fields' = 'ts'\n" + + ");", + tablePath, tableType); + tableEnv.executeSql(createTableDdl); + } + + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + public void testValidationVectorColumnsFromCOWTable(HoodieTableType tableType) throws Exception { + // Validate exception will be thrown when reading VECTOR columns from a table with VECTOR columns + Path cowTargetDir = tempDir.resolve("table"); + String resourceName = tableType == HoodieTableType.MERGE_ON_READ ? "vector_mor" : "vector_cow"; + HoodieTestUtils.extractZipToDirectory(String.format("vector_cross_engine_validation/%s.zip", resourceName), cowTargetDir, getClass()); + String cowPath = cowTargetDir.resolve(resourceName).toString(); + TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv(); + createTable(tableEnv, cowPath, tableType.name()); + + // ValidationException expects to be thrown + assertThrows(RuntimeException.class, + () -> CollectionUtil.iteratorToList(tableEnv.executeSql("select * from vector_table").collect()), + "Unexpected type exception. Primitive type: FIXED_LEN_BYTE_ARRAY. Field type: FLOAT. Field name: embedding1"); + } + + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + public void testReadNonVectorColumnsFromCOWTable(HoodieTableType tableType) throws Exception { + // Test that Flink can read non-VECTOR columns from a table with VECTOR columns + Path cowTargetDir = tempDir.resolve("table"); + String resourceName = tableType == HoodieTableType.MERGE_ON_READ ? "vector_mor" : "vector_cow"; + HoodieTestUtils.extractZipToDirectory(String.format("vector_cross_engine_validation/%s.zip", resourceName), cowTargetDir, getClass()); + String cowPath = cowTargetDir.resolve(resourceName).toString(); + TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv(); + createTable(tableEnv, cowPath, tableType.name()); + + List<Row> rows = CollectionUtil.iteratorToList(tableEnv.executeSql("select id, name, ts from vector_table").collect()); + assertRowsEquals(rows, "[+I[1, doc-1, 1000], +I[2, doc-2, 2000]]"); + } +}
