This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d6f0c11dbbb5 feat(flink): Support reading and writing VECTOR fields in 
clustering (#18913)
d6f0c11dbbb5 is described below

commit d6f0c11dbbb582225cd77329666ce3b68fc002be
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Jun 22 12:58:24 2026 +0800

    feat(flink): Support reading and writing VECTOR fields in clustering 
(#18913)
---
 .../row/parquet/ParquetSchemaConverter.java        |  14 +-
 .../row/parquet/TestParquetSchemaConverter.java    |  14 ++
 .../hudi/sink/clustering/ClusteringOperator.java   |   5 +-
 .../apache/hudi/table/ITTestVectorDataSource.java  | 203 +++++++++++++++++++--
 4 files changed, 221 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 5bad03d9253e..5efe4aeee62c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -148,9 +148,17 @@ public class ParquetSchemaConverter {
           dataType = DataTypes.of(new TimestampType(9));
           break;
         case FIXED_LEN_BYTE_ARRAY:
-          LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
-              (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
-          dataType = DataTypes.of(new DecimalType(decimalType.getPrecision(), 
decimalType.getScale()));
+          if (logicalType instanceof 
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+            LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
+                (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType;
+            dataType = DataTypes.of(new 
DecimalType(decimalType.getPrecision(), decimalType.getScale()));
+          } else {
+            // VECTOR columns are stored as bare FIXED_LEN_BYTE_ARRAY without 
a Parquet logical type annotation,
+            // HoodieParquetFileFormatHelper#buildImplicitSchemaChangeInfo.
+            // Treat the physical type as bytes here; vector semantics are 
restored
+            // from the schema/footer metadata by the vector-aware read path.
+            dataType = DataTypes.BYTES();
+          }
           break;
         default:
           throw new UnsupportedOperationException("Unsupported type: " + 
parquetType);
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index 66c6f9e86263..23c2d6dac56e 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -241,6 +241,20 @@ public class TestParquetSchemaConverter {
     assertEquals(16, featuresType.getTypeLength());
   }
 
+  @Test
+  void testUnannotatedFixedLenByteArrayConvertsToBytes() {
+    MessageType messageType = new MessageType(
+        "test",
+        Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, 
Type.Repetition.REQUIRED).named("id"),
+        Types.primitive(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Type.Repetition.OPTIONAL)
+            .length(8)
+            .named("embedding"));
+
+    RowType rowType = ParquetSchemaConverter.convertToRowType(messageType);
+
+    assertEquals(DataTypes.BYTES().getLogicalType(), rowType.getTypeAt(1));
+  }
+
   @Test
   void testVectorFooterMetadataComesFromHoodieSchema() {
     HoodieSchema hoodieSchema = HoodieSchema.createRecord(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index f67be66b3c3e..36349acbe031 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -168,7 +168,10 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     this.writeClient = FlinkWriteClients.createWriteClient(conf, 
getRuntimeContext());
     this.table = writeClient.getHoodieTable();
 
-    this.schema = HoodieSchemaConverter.convertToSchema(rowType);
+    this.schema = HoodieSchemaConverter.convertToSchema(
+        rowType,
+        
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME)),
+        conf.get(FlinkOptions.VECTOR_COLUMNS));
     // Since there exists discrepancies between flink and spark dealing with 
nullability of primary key field,
     // and there may be some files written by spark, force update schema as 
nullable to make sure clustering
     // scan successfully without schema validating exception.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
index 1b6cda8c1013..8f02f49ba16b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java
@@ -18,20 +18,41 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringOperator;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.FlinkMiniCluster;
 import org.apache.hudi.utils.TestTableEnvs;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.Test;
@@ -42,9 +63,13 @@ import org.junit.jupiter.params.provider.EnumSource;
 
 import java.lang.reflect.Array;
 import java.nio.file.Path;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -200,6 +225,30 @@ public class ITTestVectorDataSource {
     assertEquals("new3", rows.get(2).getField(2));
   }
 
+  @Test
+  public void testVectorCopyOnWriteClustering() throws Exception {
+    TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+    String tablePath = tempDir.resolve("clustering").toUri().toString();
+    String vectorColsConf = "embedding:2,features:2,codes:2";
+    createVectorTable(tableEnv, "vector_table", tablePath, 
HoodieTableType.COPY_ON_WRITE, vectorColsConf, "insert");
+
+    execInsertSql(tableEnv,
+        "INSERT INTO vector_table(id, embedding, features, codes, label, ts) 
VALUES "
+            + "('id1', ARRAY[CAST(1.0 AS FLOAT), CAST(1.1 AS FLOAT)], 
ARRAY[CAST(10.0 AS DOUBLE), CAST(10.1 AS DOUBLE)], "
+            + " ARRAY[CAST(1 AS TINYINT), CAST(2 AS TINYINT)], 'cluster1', 1), 
"
+            + "('id2', ARRAY[CAST(2.0 AS FLOAT), CAST(2.2 AS FLOAT)], 
ARRAY[CAST(20.0 AS DOUBLE), CAST(20.2 AS DOUBLE)], "
+            + " ARRAY[CAST(3 AS TINYINT), CAST(4 AS TINYINT)], 'cluster2', 2), 
"
+            + "('id3', ARRAY[CAST(3.0 AS FLOAT), CAST(3.3 AS FLOAT)], 
ARRAY[CAST(30.0 AS DOUBLE), CAST(30.3 AS DOUBLE)], "
+            + " ARRAY[CAST(5 AS TINYINT), CAST(6 AS TINYINT)], 'cluster3', 
3)");
+
+    runClustering(tablePath, vectorColsConf);
+
+    assertStoredVectorSchema(tablePath, "embedding", 2, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+    assertStoredVectorSchema(tablePath, "features", 2, 
HoodieSchema.Vector.VectorElementType.DOUBLE);
+    assertStoredVectorSchema(tablePath, "codes", 2, 
HoodieSchema.Vector.VectorElementType.INT8);
+    assertClusteredVectorRows(tableEnv);
+  }
+
   @Test
   public void testVectorDimensionMismatchFails() {
     TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
@@ -229,6 +278,32 @@ public class ITTestVectorDataSource {
       HoodieTableType tableType,
       String vectorColumns,
       String writeOperation) {
+    createVectorTable(tableEnv, tableName, tablePath, tableType, 
vectorColumns, writeOperation, Collections.emptyMap());
+  }
+
+  private static void createVectorTable(
+      TableEnvironment tableEnv,
+      String tableName,
+      String tablePath,
+      HoodieTableType tableType,
+      String vectorColumns,
+      String writeOperation,
+      Map<String, String> extraOptions) {
+    Map<String, String> options = new LinkedHashMap<>();
+    options.put("connector", "hudi");
+    options.put("path", tablePath);
+    options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
+    options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
+    options.put(FlinkOptions.VECTOR_COLUMNS.key(), vectorColumns);
+    options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+    options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+    if (writeOperation != null) {
+      options.put(FlinkOptions.OPERATION.key(), writeOperation);
+    }
+    options.put(FlinkOptions.WRITE_TASKS.key(), "1");
+    options.put(FlinkOptions.READ_TASKS.key(), "1");
+    options.putAll(extraOptions);
+
     tableEnv.executeSql(
         "CREATE TABLE " + tableName + " ("
             + " id STRING,"
@@ -241,20 +316,126 @@ public class ITTestVectorDataSource {
             + " ts BIGINT,"
             + " PRIMARY KEY (id) NOT ENFORCED"
             + ") WITH ("
-            + " 'connector' = 'hudi',"
-            + " 'path' = '" + tablePath + "',"
-            + " '" + FlinkOptions.TABLE_TYPE.key() + "' = '" + 
tableType.name() + "',"
-            + " '" + FlinkOptions.ORDERING_FIELDS.key() + "' = 'ts',"
-            + " '" + FlinkOptions.VECTOR_COLUMNS.key() + "' = '" + 
vectorColumns + "',"
-            + " '" + FlinkOptions.METADATA_ENABLED.key() + "' = 'false',"
-            + " '" + FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key() + "' = 
'false',"
-            + " '" + FlinkOptions.COMPACTION_ASYNC_ENABLED.key() + "' = 
'false',"
-            + (writeOperation == null ? "" : " '" + 
FlinkOptions.OPERATION.key() + "' = '" + writeOperation + "',")
-            + " '" + FlinkOptions.WRITE_TASKS.key() + "' = '1',"
-            + " '" + FlinkOptions.READ_TASKS.key() + "' = '1'"
+            + formatOptions(options)
             + ")");
   }
 
+  private static void runClustering(String tablePath, String vectorColsConf) 
throws Exception {
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tablePath;
+    cfg.targetPartitions = 1;
+    cfg.sortMemory = 128;
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    conf.set(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+    conf.set(FlinkOptions.VECTOR_COLUMNS, vectorColsConf);
+    CompactionUtil.setPartitionField(conf, metaClient);
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    try (HoodieFlinkWriteClient<?> writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
+      HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+      Option<String> clusteringInstantTime = 
writeClient.scheduleClustering(Option.empty());
+      assertTrue(clusteringInstantTime.isPresent(), "The clustering plan 
should be scheduled");
+
+      table.getMetaClient().reloadActiveTimeline();
+      HoodieTimeline timeline = 
table.getActiveTimeline().filterPendingClusteringTimeline()
+          .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
+      Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption =
+          ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
timeline.lastInstant().get());
+      HoodieClusteringPlan clusteringPlan = 
clusteringPlanOption.get().getRight();
+      assertTrue(clusteringPlan.getInputGroups().size() > 0, "The clustering 
plan should contain input groups");
+
+      HoodieInstant instant = 
INSTANT_GENERATOR.getClusteringCommitRequestedInstant(clusteringInstantTime.get());
+      table.getActiveTimeline().transitionClusterRequestedToInflight(instant, 
Option.empty());
+
+      HoodieSchema tableSchema = 
StreamerUtil.getTableSchema(table.getMetaClient(), false);
+      DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableSchema);
+      RowType rowType = (RowType) rowDataType.getLogicalType();
+
+      StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+      DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(clusteringInstantTime.get(), clusteringPlan, conf))
+          .name("clustering_source")
+          .uid("uid_vector_clustering_source")
+          .rebalance()
+          .transform("clustering_task",
+              TypeInformation.of(ClusteringCommitEvent.class),
+              new ClusteringOperator(conf, rowType))
+          .setParallelism(clusteringPlan.getInputGroups().size());
+
+      ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+          conf.get(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+      dataStream
+          .addSink(new ClusteringCommitSink(conf))
+          .name("clustering_commit")
+          .uid("uid_vector_clustering_commit")
+          .setParallelism(1);
+
+      env.execute("flink_hudi_vector_clustering");
+      
assertTrue(table.getMetaClient().reloadActiveTimeline().filterCompletedInstants().containsInstant(instant.requestedTime()));
+    }
+  }
+
+  private static String formatOptions(Map<String, String> options) {
+    StringBuilder builder = new StringBuilder();
+    for (Map.Entry<String, String> entry : options.entrySet()) {
+      if (builder.length() > 0) {
+        builder.append(",");
+      }
+      builder.append(" '").append(entry.getKey()).append("' = 
'").append(entry.getValue()).append("'");
+    }
+    return builder.toString();
+  }
+
+  private static void assertCompactedVectorRows(TableEnvironment tableEnv) {
+    List<Row> rows = collect(tableEnv, "SELECT id, embedding, features, codes, 
label, ts FROM vector_table ORDER BY id");
+    assertEquals(3, rows.size());
+
+    assertEquals("id1", rows.get(0).getField(0));
+    assertFloatArray(rows.get(0).getField(1), new float[] {9.0f, 9.9f});
+    assertDoubleArray(rows.get(0).getField(2), new double[] {90.0d, 90.9d});
+    assertByteArray(rows.get(0).getField(3), new byte[] {9, 8});
+    assertEquals("new1", rows.get(0).getField(4));
+    assertEquals(10L, rows.get(0).getField(5));
+
+    assertEquals("id2", rows.get(1).getField(0));
+    assertFloatArray(rows.get(1).getField(1), new float[] {2.0f, 2.2f});
+    assertDoubleArray(rows.get(1).getField(2), new double[] {20.0d, 20.2d});
+    assertByteArray(rows.get(1).getField(3), new byte[] {3, 4});
+    assertEquals("old2", rows.get(1).getField(4));
+
+    assertEquals("id3", rows.get(2).getField(0));
+    assertFloatArray(rows.get(2).getField(1), new float[] {3.0f, 3.3f});
+    assertDoubleArray(rows.get(2).getField(2), new double[] {30.0d, 30.3d});
+    assertByteArray(rows.get(2).getField(3), new byte[] {5, 6});
+    assertEquals("new3", rows.get(2).getField(4));
+  }
+
+  private static void assertClusteredVectorRows(TableEnvironment tableEnv) {
+    List<Row> rows = collect(tableEnv, "SELECT id, embedding, features, codes, 
label, ts FROM vector_table ORDER BY id");
+    assertEquals(3, rows.size());
+
+    assertEquals("id1", rows.get(0).getField(0));
+    assertFloatArray(rows.get(0).getField(1), new float[] {1.0f, 1.1f});
+    assertDoubleArray(rows.get(0).getField(2), new double[] {10.0d, 10.1d});
+    assertByteArray(rows.get(0).getField(3), new byte[] {1, 2});
+    assertEquals("cluster1", rows.get(0).getField(4));
+
+    assertEquals("id2", rows.get(1).getField(0));
+    assertFloatArray(rows.get(1).getField(1), new float[] {2.0f, 2.2f});
+    assertDoubleArray(rows.get(1).getField(2), new double[] {20.0d, 20.2d});
+    assertByteArray(rows.get(1).getField(3), new byte[] {3, 4});
+    assertEquals("cluster2", rows.get(1).getField(4));
+
+    assertEquals("id3", rows.get(2).getField(0));
+    assertFloatArray(rows.get(2).getField(1), new float[] {3.0f, 3.3f});
+    assertDoubleArray(rows.get(2).getField(2), new double[] {30.0d, 30.3d});
+    assertByteArray(rows.get(2).getField(3), new byte[] {5, 6});
+    assertEquals("cluster3", rows.get(2).getField(4));
+  }
+
   private static void execInsertSql(TableEnvironment tableEnv, String insert) 
throws ExecutionException, InterruptedException {
     TableResult tableResult = tableEnv.executeSql(insert);
     tableResult.await();

Reply via email to