This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6bd2ace52ad0 fix(flink): Support read non-VECTOR columns from table
containing VEC… (#18712)
6bd2ace52ad0 is described below
commit 6bd2ace52ad0caea656831c589e9ca2b3d8e7eb4
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon May 11 13:54:16 2026 +0800
fix(flink): Support read non-VECTOR columns from table containing VEC…
(#18712)
* fix(flink): Support read non-VECTOR columns from table containing VECTOR
columns
---
.../apache/hudi/util/HoodieSchemaConverter.java | 23 +++++
.../hudi/util/TestHoodieSchemaConverter.java | 54 +++++++++++
.../vector_cross_engine_validation/README.md | 66 ++++++++++++++
.../vector_cross_engine_validation/vector_cow.zip | Bin 0 -> 80614 bytes
.../vector_cross_engine_validation/vector_mor.zip | Bin 0 -> 77871 bytes
.../ITTestVectorCrossEngineCompatibility.java | 101 +++++++++++++++++++++
6 files changed, 244 insertions(+)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index a79f19e0b782..c49142800b45 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -418,6 +418,8 @@ public class HoodieSchemaConverter {
return convertUnion(hoodieSchema);
case VARIANT:
return convertVariant(hoodieSchema);
+ case VECTOR:
+ return convertVector(hoodieSchema);
default:
throw new IllegalArgumentException("Unsupported HoodieSchemaType: " +
type);
}
@@ -474,6 +476,27 @@ public class HoodieSchemaConverter {
return DataTypes.TIME(flinkPrecision).notNull();
}
+ private static DataType convertVector(HoodieSchema schema) {
+ if (!(schema instanceof HoodieSchema.Vector)) {
+ throw new IllegalStateException("Expected HoodieSchema.Vector but got: "
+ schema.getClass());
+ }
+ HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema;
+ return
DataTypes.ARRAY(convertVectorElementType(vectorSchema.getVectorElementType())).notNull();
+ }
+
+ private static DataType
convertVectorElementType(HoodieSchema.Vector.VectorElementType elementType) {
+ switch (elementType) {
+ case FLOAT:
+ return DataTypes.FLOAT().notNull();
+ case DOUBLE:
+ return DataTypes.DOUBLE().notNull();
+ case INT8:
+ return DataTypes.TINYINT().notNull();
+ default:
+ throw new IllegalArgumentException("Unsupported VECTOR element type: "
+ elementType);
+ }
+ }
+
private static DataType createBlob() {
// Create nested reference ROW type
DataType referenceType = DataTypes.ROW(
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index feddf10a258b..51b8a15f4c7b 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
@@ -41,6 +42,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -476,6 +478,58 @@ public class TestHoodieSchemaConverter {
assertTrue(dataType.getLogicalType() instanceof VarBinaryType);
}
+ @Test
+ public void testVectorConversion() {
+ HoodieSchema floatVectorSchema = HoodieSchema.createVector(128);
+ HoodieSchema doubleVectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.DOUBLE);
+ HoodieSchema int8VectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.INT8);
+
+ DataType floatDataType =
HoodieSchemaConverter.convertToDataType(floatVectorSchema);
+ DataType doubleDataType =
HoodieSchemaConverter.convertToDataType(doubleVectorSchema);
+ DataType int8DataType =
HoodieSchemaConverter.convertToDataType(int8VectorSchema);
+
+ assertVectorArray(floatDataType, LogicalTypeRoot.FLOAT, false);
+ assertVectorArray(doubleDataType, LogicalTypeRoot.DOUBLE, false);
+ assertVectorArray(int8DataType, LogicalTypeRoot.TINYINT, false);
+ }
+
+ @Test
+ public void testNullableVectorConversion() {
+ HoodieSchema vectorSchema =
HoodieSchema.createNullable(HoodieSchema.createVector(128));
+
+ DataType dataType = HoodieSchemaConverter.convertToDataType(vectorSchema);
+
+ assertVectorArray(dataType, LogicalTypeRoot.FLOAT, true);
+ }
+
+ @Test
+ public void testVectorInRecordConversion() {
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "test_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("embedding", HoodieSchema.createVector(128))
+ )
+ );
+
+ RowType rowType = HoodieSchemaConverter.convertToRowType(schema);
+
+ assertEquals(2, rowType.getFieldCount());
+ assertEquals("embedding", rowType.getFieldNames().get(1));
+ ArrayType vectorArrayType = assertInstanceOf(ArrayType.class,
rowType.getTypeAt(1));
+ assertEquals(LogicalTypeRoot.FLOAT,
vectorArrayType.getElementType().getTypeRoot());
+ assertFalse(rowType.getTypeAt(1).isNullable());
+ }
+
+ private void assertVectorArray(DataType dataType, LogicalTypeRoot
elementTypeRoot, boolean nullable) {
+ ArrayType arrayType = assertInstanceOf(ArrayType.class,
dataType.getLogicalType());
+ assertEquals(elementTypeRoot, arrayType.getElementType().getTypeRoot());
+ assertFalse(arrayType.getElementType().isNullable());
+ assertEquals(nullable, dataType.getLogicalType().isNullable());
+ }
+
@Test
void testUnionSchemaWithMultipleRecordTypes() {
HoodieSchema schema =
HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$);
diff --git
a/hudi-common/src/test/resources/vector_cross_engine_validation/README.md
b/hudi-common/src/test/resources/vector_cross_engine_validation/README.md
new file mode 100644
index 000000000000..1d6472fa0bf1
--- /dev/null
+++ b/hudi-common/src/test/resources/vector_cross_engine_validation/README.md
@@ -0,0 +1,66 @@
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Generation of test files
+
+Spark3.5 is used to generate these test files.
+
+Table schema:
+* MOR table
+```sql
+CREATE TABLE vector_table_mor (
+ id BIGINT,
+ name STRING,
+ embedding1 VECTOR(128) COMMENT 'document float embedding',
+ embedding2 VECTOR(128, DOUBLE) COMMENT 'document double embedding',
+ embedding3 VECTOR(128, INT8) COMMENT 'document INT8 embedding',
+ ts BIGINT
+) USING hudi
+LOCATION '/tmp/hudi_vector_table_mor'
+TBLPROPERTIES (
+ primaryKey = 'id',
+ preCombineField = 'ts',
+ type = 'mor',
+ hoodie.index.type = 'INMEMORY'
+);
+```
+
+* COW table
+```sql
+CREATE TABLE vector_table_cow (
+ id BIGINT,
+ name STRING,
+ embedding1 VECTOR(128) COMMENT 'document float embedding',
+ embedding2 VECTOR(128, DOUBLE) COMMENT 'document double embedding',
+ embedding3 VECTOR(128, INT8) COMMENT 'document INT8 embedding',
+ ts BIGINT
+) USING hudi
+LOCATION '/tmp/hudi_vector_table_mor'
+TBLPROPERTIES (
+ primaryKey = 'id',
+ preCombineField = 'ts',
+ type = 'cow'
+);
+```
+
+The shell commands used to generate this are:
+
+```shell
+cd /path/to/test/files/
+zip -r $TABLE_DIR_NAME.zip $TABLE_DIR_NAME
+```
diff --git
a/hudi-common/src/test/resources/vector_cross_engine_validation/vector_cow.zip
b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_cow.zip
new file mode 100644
index 000000000000..a1afed0ea6e1
Binary files /dev/null and
b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_cow.zip
differ
diff --git
a/hudi-common/src/test/resources/vector_cross_engine_validation/vector_mor.zip
b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_mor.zip
new file mode 100644
index 000000000000..b89c2474a5e2
Binary files /dev/null and
b/hudi-common/src/test/resources/vector_cross_engine_validation/vector_mor.zip
differ
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorCrossEngineCompatibility.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorCrossEngineCompatibility.java
new file mode 100644
index 000000000000..d45b44918014
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorCrossEngineCompatibility.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestTableEnvs;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.apache.hudi.utils.TestData.assertRowsEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Integration test for cross-engine compatibility - verifying that Flink can
read tables with VECTOR columns written by Spark.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestVectorCrossEngineCompatibility {
+ @TempDir
+ Path tempDir;
+
+ private void createTable(TableEnvironment tableEnv, String tablePath, String
tableType) throws Exception {
+ // Create a Hudi table pointing to the Spark-written data
+ // In Flink, VECTOR is represented as ARRAY<FLOAT/DOUBLE/TINYINT>
+ String createTableDdl = String.format(
+ "CREATE TABLE vector_table (\n"
+ + " id BIGINT,\n"
+ + " name STRING,\n"
+ + " embedding1 ARRAY<FLOAT>,\n"
+ + " embedding2 ARRAY<DOUBLE>,\n"
+ + " embedding3 ARRAY<TINYINT>,\n"
+ + " ts BIGINT,\n"
+ + " PRIMARY KEY (id) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'connector' = 'hudi',\n"
+ + " 'path' = '%s',\n"
+ + " 'table.type' = '%s',\n"
+ + " 'ordering.fields' = 'ts'\n"
+ + ");",
+ tablePath, tableType);
+ tableEnv.executeSql(createTableDdl);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ public void testValidationVectorColumnsFromCOWTable(HoodieTableType
tableType) throws Exception {
+ // Validate exception will be thrown when reading VECTOR columns from a
table with VECTOR columns
+ Path cowTargetDir = tempDir.resolve("table");
+ String resourceName = tableType == HoodieTableType.MERGE_ON_READ ?
"vector_mor" : "vector_cow";
+
HoodieTestUtils.extractZipToDirectory(String.format("vector_cross_engine_validation/%s.zip",
resourceName), cowTargetDir, getClass());
+ String cowPath = cowTargetDir.resolve(resourceName).toString();
+ TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+ createTable(tableEnv, cowPath, tableType.name());
+
+ // ValidationException expects to be thrown
+ assertThrows(RuntimeException.class,
+ () -> CollectionUtil.iteratorToList(tableEnv.executeSql("select * from
vector_table").collect()),
+ "Unexpected type exception. Primitive type: FIXED_LEN_BYTE_ARRAY.
Field type: FLOAT. Field name: embedding1");
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ public void testReadNonVectorColumnsFromCOWTable(HoodieTableType tableType)
throws Exception {
+ // Test that Flink can read non-VECTOR columns from a table with VECTOR
columns
+ Path cowTargetDir = tempDir.resolve("table");
+ String resourceName = tableType == HoodieTableType.MERGE_ON_READ ?
"vector_mor" : "vector_cow";
+
HoodieTestUtils.extractZipToDirectory(String.format("vector_cross_engine_validation/%s.zip",
resourceName), cowTargetDir, getClass());
+ String cowPath = cowTargetDir.resolve(resourceName).toString();
+ TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+ createTable(tableEnv, cowPath, tableType.name());
+
+ List<Row> rows = CollectionUtil.iteratorToList(tableEnv.executeSql("select
id, name, ts from vector_table").collect());
+ assertRowsEquals(rows, "[+I[1, doc-1, 1000], +I[2, doc-2, 2000]]");
+ }
+}