hudi-agent commented on code in PR #19044:
URL: https://github.com/apache/hudi/pull/19044#discussion_r3459436053


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java:
##########
@@ -262,6 +385,57 @@ public void testVectorDimensionMismatchFails() {
         "Expected dimension mismatch in exception chain, got: " + 
exception.getMessage());
   }
 
+  @Test
+  public void testParquetFooterContainsVectorMetadata() throws Exception {
+    TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+    Path tableDir = tempDir.resolve("vector_footer");
+    String tablePath = tableDir.toUri().toString();
+    createVectorTable(tableEnv, "vector_table", tablePath, 
HoodieTableType.COPY_ON_WRITE, "embedding:2,features:2,codes:2");
+
+    execInsertSql(tableEnv,
+        "INSERT INTO vector_table(id, embedding, features, codes, label, ts) 
VALUES "
+            + "('id1', ARRAY[CAST(1.0 AS FLOAT), CAST(1.5 AS FLOAT)], 
ARRAY[CAST(10.0 AS DOUBLE), CAST(10.5 AS DOUBLE)], "
+            + " ARRAY[CAST(1 AS TINYINT), CAST(2 AS TINYINT)], 'footer', 1)");
+
+    Path parquetFile = findFirstParquetFile(tableDir);
+    try (ParquetFileReader reader = ParquetFileReader.open(
+        new org.apache.hadoop.conf.Configuration(), new 
org.apache.hadoop.fs.Path(parquetFile.toUri()))) {
+      ParquetMetadata metadata = reader.getFooter();
+      assertEquals("embedding:VECTOR(2),features:VECTOR(2, 
DOUBLE),codes:VECTOR(2, INT8)",
+          
metadata.getFileMetaData().getKeyValueMetaData().get(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY));
+
+      PrimitiveType embeddingType = 
metadata.getFileMetaData().getSchema().getType("embedding").asPrimitiveType();
+      assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
embeddingType.getPrimitiveTypeName());
+      assertEquals(8, embeddingType.getTypeLength());
+      PrimitiveType featuresType = 
metadata.getFileMetaData().getSchema().getType("features").asPrimitiveType();
+      assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
featuresType.getPrimitiveTypeName());
+      assertEquals(16, featuresType.getTypeLength());
+      PrimitiveType codesType = 
metadata.getFileMetaData().getSchema().getType("codes").asPrimitiveType();
+      assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
codesType.getPrimitiveTypeName());
+      assertEquals(2, codesType.getTypeLength());
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("invalidVectorColumnConfigs")
+  public void testInvalidVectorColumns(String vectorColumns, String 
expectedMessage) {
+    TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+    String tableName = "invalid_vector_" + 
Integer.toHexString(vectorColumns.hashCode()).replace('-', '_');
+    String tablePath = tempDir.resolve(tableName).toUri().toString();
+
+    Exception exception = assertThrows(Exception.class, () -> {

Review Comment:
   🤖 nit: `Integer.toHexString()` treats the int as unsigned and only ever 
produces hex digits (0–9, a–f), so `.replace('-', '_')` is a no-op — could you 
remove it to avoid implying that `-` can appear here?
   
   <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java:
##########
@@ -378,6 +575,60 @@ private static void runClustering(String tablePath, String 
vectorColsConf) throw
     }
   }
 
+  private static void runCompaction(String tablePath, String vectorColsConf) 
throws Exception {
+    FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+    cfg.path = tablePath;
+    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    conf.set(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+    conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    conf.set(FlinkOptions.VECTOR_COLUMNS, vectorColsConf);
+    CompactionUtil.setPartitionField(conf, metaClient);
+    CompactionUtil.setAvroSchema(conf, metaClient);
+    CompactionUtil.inferChangelogMode(conf, metaClient);
+
+    try (HoodieFlinkWriteClient<?> writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
+      HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+      Option<String> compactionInstantTime = 
writeClient.scheduleCompaction(Option.empty());
+      assertTrue(compactionInstantTime.isPresent(), "The compaction plan 
should be scheduled");
+
+      HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
+          table.getMetaClient(), compactionInstantTime.get());
+      assertTrue(compactionPlan.getOperations().size() > 0, "The compaction 
plan should contain operations");
+
+      HoodieInstant instant = 
INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime.get());
+      
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+
+      StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+      env.addSource(new 
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime.get(),
 compactionPlan)), conf))
+          .name("compaction_source")
+          .uid("uid_vector_compaction_source")
+          .rebalance()
+          .transform("compact_task",
+              TypeInformation.of(CompactionCommitEvent.class),
+              new CompactOperator(conf))
+          .setParallelism(compactionPlan.getOperations().size())
+          .addSink(new CompactionCommitSink(conf))
+          .name("compaction_commit")
+          .uid("uid_vector_compaction_commit")
+          .setParallelism(1);
+
+      env.execute("flink_hudi_vector_compaction");
+      
assertTrue(table.getMetaClient().reloadActiveTimeline().filterCompletedInstants().containsInstant(instant.requestedTime()));
+    }

Review Comment:
   🤖 nit: the idiomatic JUnit 5 convention for `@MethodSource` providers is 
`Stream<Arguments>` with `Arguments.of(...)` rather than raw `Object[]` arrays 
— would you consider switching to that here?
   
   <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to