hudi-agent commented on code in PR #19044:
URL: https://github.com/apache/hudi/pull/19044#discussion_r3459436053
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java:
##########
@@ -262,6 +385,57 @@ public void testVectorDimensionMismatchFails() {
"Expected dimension mismatch in exception chain, got: " +
exception.getMessage());
}
+ @Test
+ public void testParquetFooterContainsVectorMetadata() throws Exception {
+ TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+ Path tableDir = tempDir.resolve("vector_footer");
+ String tablePath = tableDir.toUri().toString();
+ createVectorTable(tableEnv, "vector_table", tablePath,
HoodieTableType.COPY_ON_WRITE, "embedding:2,features:2,codes:2");
+
+ execInsertSql(tableEnv,
+ "INSERT INTO vector_table(id, embedding, features, codes, label, ts)
VALUES "
+ + "('id1', ARRAY[CAST(1.0 AS FLOAT), CAST(1.5 AS FLOAT)],
ARRAY[CAST(10.0 AS DOUBLE), CAST(10.5 AS DOUBLE)], "
+ + " ARRAY[CAST(1 AS TINYINT), CAST(2 AS TINYINT)], 'footer', 1)");
+
+ Path parquetFile = findFirstParquetFile(tableDir);
+ try (ParquetFileReader reader = ParquetFileReader.open(
+ new org.apache.hadoop.conf.Configuration(), new
org.apache.hadoop.fs.Path(parquetFile.toUri()))) {
+ ParquetMetadata metadata = reader.getFooter();
+ assertEquals("embedding:VECTOR(2),features:VECTOR(2,
DOUBLE),codes:VECTOR(2, INT8)",
+
metadata.getFileMetaData().getKeyValueMetaData().get(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY));
+
+ PrimitiveType embeddingType =
metadata.getFileMetaData().getSchema().getType("embedding").asPrimitiveType();
+ assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
embeddingType.getPrimitiveTypeName());
+ assertEquals(8, embeddingType.getTypeLength());
+ PrimitiveType featuresType =
metadata.getFileMetaData().getSchema().getType("features").asPrimitiveType();
+ assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
featuresType.getPrimitiveTypeName());
+ assertEquals(16, featuresType.getTypeLength());
+ PrimitiveType codesType =
metadata.getFileMetaData().getSchema().getType("codes").asPrimitiveType();
+ assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
codesType.getPrimitiveTypeName());
+ assertEquals(2, codesType.getTypeLength());
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("invalidVectorColumnConfigs")
+ public void testInvalidVectorColumns(String vectorColumns, String
expectedMessage) {
+ TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+ String tableName = "invalid_vector_" +
Integer.toHexString(vectorColumns.hashCode()).replace('-', '_');
+ String tablePath = tempDir.resolve(tableName).toUri().toString();
+
+ Exception exception = assertThrows(Exception.class, () -> {
Review Comment:
🤖 nit: `Integer.toHexString()` treats the int as unsigned and only ever
produces hex digits (0–9, a–f), so `.replace('-', '_')` is a no-op — could you
remove it to avoid implying that `-` can appear here?
<sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVectorDataSource.java:
##########
@@ -378,6 +575,60 @@ private static void runClustering(String tablePath, String
vectorColsConf) throw
}
}
+ private static void runCompaction(String tablePath, String vectorColsConf)
throws Exception {
+ FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+ cfg.path = tablePath;
+ Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ conf.set(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+ conf.set(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
+ conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ conf.set(FlinkOptions.VECTOR_COLUMNS, vectorColsConf);
+ CompactionUtil.setPartitionField(conf, metaClient);
+ CompactionUtil.setAvroSchema(conf, metaClient);
+ CompactionUtil.inferChangelogMode(conf, metaClient);
+
+ try (HoodieFlinkWriteClient<?> writeClient =
FlinkWriteClients.createWriteClient(conf)) {
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+ Option<String> compactionInstantTime =
writeClient.scheduleCompaction(Option.empty());
+ assertTrue(compactionInstantTime.isPresent(), "The compaction plan
should be scheduled");
+
+ HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
+ table.getMetaClient(), compactionInstantTime.get());
+ assertTrue(compactionPlan.getOperations().size() > 0, "The compaction
plan should contain operations");
+
+ HoodieInstant instant =
INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime.get());
+
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.addSource(new
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime.get(),
compactionPlan)), conf))
+ .name("compaction_source")
+ .uid("uid_vector_compaction_source")
+ .rebalance()
+ .transform("compact_task",
+ TypeInformation.of(CompactionCommitEvent.class),
+ new CompactOperator(conf))
+ .setParallelism(compactionPlan.getOperations().size())
+ .addSink(new CompactionCommitSink(conf))
+ .name("compaction_commit")
+ .uid("uid_vector_compaction_commit")
+ .setParallelism(1);
+
+ env.execute("flink_hudi_vector_compaction");
+
assertTrue(table.getMetaClient().reloadActiveTimeline().filterCompletedInstants().containsInstant(instant.requestedTime()));
+ }
Review Comment:
🤖 nit: the idiomatic JUnit 5 convention for `@MethodSource` providers is
`Stream<Arguments>` with `Arguments.of(...)` rather than raw `Object[]` arrays
— would you consider switching to that here?
<sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]