hudi-agent commented on code in PR #18894:
URL: https://github.com/apache/hudi/pull/18894#discussion_r3343938821


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java:
##########
@@ -162,21 +174,27 @@ public void onCheckpointComplete(Correspondent 
correspondent, long completedChec
   }
 
   private void reloadMetadataTable() {
-    this.metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
-        HoodieFlinkEngineContext.DEFAULT,
-        metaClient.getStorage(),
-        StreamerUtil.metadataConfig(conf),
-        conf.get(FlinkOptions.PATH));
+    this.tableMetadata =
+        new HoodieBackedTableMetadata(
+            HoodieFlinkEngineContext.DEFAULT,
+            metaClient.getStorage(),
+            StreamerUtil.metadataConfig(conf),
+            conf.get(FlinkOptions.PATH));
+    if (!tableMetadata.enabled()) {
+      if (metaClient.getTableConfig().isMetadataTableAvailable()) {

Review Comment:
   🤖 Throwing `RuntimeException` here loses information — could you use 
`HoodieException` (or `HoodieMetadataException`) to be consistent with the rest 
of the file (e.g. the `close()` method below)? Also, since 
`reloadMetadataTable()` overwrites `this.tableMetadata` without closing the 
previous instance, if this method is called more than once (e.g. from 
`onCheckpointComplete`), the prior MDT reader will be leaked. Worth either 
closing the old one before reassigning, or documenting that this is only called 
once.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java:
##########
@@ -162,21 +174,27 @@ public void onCheckpointComplete(Correspondent 
correspondent, long completedChec
   }
 
   private void reloadMetadataTable() {
-    this.metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
-        HoodieFlinkEngineContext.DEFAULT,
-        metaClient.getStorage(),
-        StreamerUtil.metadataConfig(conf),
-        conf.get(FlinkOptions.PATH));
+    this.tableMetadata =

Review Comment:
   🤖 This change drops the use of 
`metaClient.getTableFormat().getMetadataFactory().create(...)` and instantiates 
`HoodieBackedTableMetadata` directly. Was that intentional? It bypasses the 
table-format abstraction, so any table format that swaps in a different 
metadata implementation would be broken here. If you only need access to 
`enabled()`, would it be cleaner to keep using the factory and cast/check at 
the interface level, or expose `enabled()` on `HoodieTableMetadata`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java:
##########
@@ -160,11 +158,11 @@ private Processor initRecordProcessor() {
         public void process(List<HoodieFlinkInternalRow> records, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
           List<String> recordKeys = 
records.stream().map(HoodieFlinkInternalRow::getRecordKey).collect(Collectors.toList());
           MinibatchIndexBackend minibatchIndexBackend = 
(MinibatchIndexBackend) delegateFunction.getIndexBackend();
-          // get record locations by minibatch
-          Map<String, HoodieRecordGlobalLocation> recordLocations = 
minibatchIndexBackend.get(recordKeys);
+          // warm up the in-memory cache for record level index
+          minibatchIndexBackend.get(recordKeys);

Review Comment:
   🤖 nit: `get(recordKeys)` is called here purely for its cache-warming side 
effect, but the name `get` signals a pure read — a future reader will wonder 
whether the return value was accidentally dropped. It might be worth adding a 
dedicated `warmUp` or `prefetch` method on `MinibatchIndexBackend` to make the 
intent explicit.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java:
##########
@@ -291,8 +291,8 @@ private static DataStream<HoodieFlinkInternalRow> 
streamBootstrap(
       boolean bounded) {
     DataStream<HoodieFlinkInternalRow> dataStream1 = 
rowDataToHoodieRecord(conf, rowType, dataStream);
 
-    if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
-      boolean isRliBootstrap = OptionsResolver.isGlobalRecordLevelIndex(conf);
+    boolean isRliBootstrap = OptionsResolver.isGlobalRecordLevelIndex(conf);

Review Comment:
   🤖 nit: `isRliBootstrap` reads as "is bootstrap enabled for RLI", but the 
value is just a check on the index *type* — could you rename it to `isRliIndex` 
(or `isGlobalRecordLevelIndex`) to avoid confusing it with the 
`INDEX_BOOTSTRAP_ENABLED` flag on the line directly below?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java:
##########
@@ -93,6 +95,14 @@ public static int computeNumShardsAssigned(int taskIndex, 
int numPartitions, int
   static int getNumFileGroupsForRecordIndexPartition(Configuration conf) {
     String tablePath = conf.get(FlinkOptions.PATH);
     HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    // For flink adaptive batch execution, writer coordinator is not started 
yet, so metadata table
+    // is not initialized for a new table.
+    if 
(!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
 {
+      // Get the minimum file group count used to initialize global record 
level index from
+      // the write config, since the config value is customized for the flink 
writer.
+      HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf);
+      return writeConfig.getGlobalRecordLevelIndexMinFileGroupCount();

Review Comment:
   🤖 @danny0405 Could you confirm that the value returned from 
`writeConfig.getGlobalRecordLevelIndexMinFileGroupCount()` here will exactly 
match the file group count that the MDT writer will use to initialize the RLI 
partition? If the writer ends up initializing the RLI partition with a 
different number of file groups (e.g. due to auto-tuning or a different 
max-count config taking effect), the partitioner's sharding decisions made 
before MDT initialization could disagree with the post-initialization layout 
and route the same key to the wrong shard.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java:
##########
@@ -81,7 +83,9 @@ public void registerMetrics(MetricGroup metricGroup) {
 
   @Override
   public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
-    throw new UnsupportedOperationException(this.getClass().getSimpleName() + 
" doesn't support lookup with a single key.");
+    // note: always fetch record location from the cache, since this backend 
is only used for minibatch mode,
+    // and the cache has been warmed up by calling `get(List<String> 
recordKeys)` previously.
+    return recordIndexCache.get(recordKey);

Review Comment:
   🤖 The contract for single-key `get()` changed from "throw 
UnsupportedOperationException" to "return whatever is in the cache". That works 
for the minibatch flow where `get(List)` warms up the cache first, but it now 
silently returns `null` (treated as INSERT in `processChangingRecord`) if 
anyone calls this without warmup. Is there any non-minibatch path that could 
hit this and incorrectly classify an UPDATE as an INSERT? If not, would a 
defensive assertion-style check (or keeping it as a single-key delegating call 
into `get(List.of(key))`) make the contract harder to misuse?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -3738,6 +3740,18 @@ private static Stream<Arguments> 
indexAndPartitioningParams() {
     return Stream.of(data).map(Arguments::of);
   }
 
+  /**
+   * Return test params => (index type, boolean).
+   */

Review Comment:
   🤖 nit: `indexAndBooleanParams` doesn't convey what the boolean represents — 
`indexAndBootstrapParams` would tie it back to `bootstrapEnabled` without 
needing to open the method body.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to