hudi-agent commented on code in PR #18894:
URL: https://github.com/apache/hudi/pull/18894#discussion_r3348136088


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java:
##########
@@ -115,15 +115,6 @@ private int getFileGroupCountForPartitionedRLI(String 
partitionPath) {
     // HoodieBackedTableMetadataWriter initializes record-index file groups 
for a newly seen
     // data partition with RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP, so 
the writer-side
     // partitioner should use the same count before that partition appears in 
the MDT view.
-    return fileGroupCount > 0 ? fileGroupCount : 
getMinFileGroupCountForPartitionedRLI();
-  }
-
-  /**
-   * Get the minimum file group count used to initialize newly seen 
partitioned record index partitions.
-   */
-  private int getMinFileGroupCountForPartitionedRLI() {
-    return Integer.parseInt(conf.getString(
-        
HoodieMetadataConfig.RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
-        
HoodieMetadataConfig.RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.defaultValue().toString()));
+    return fileGroupCount > 0 ? fileGroupCount : 
OptionsResolver.estimateFileGroupCountForRLI(conf);

Review Comment:
   🤖 `partition()` is called per record, and for any data partition not yet in 
`partitionedRLIFileGroupCounts`, this falls through to 
`OptionsResolver.estimateFileGroupCountForRLI(conf)`, which builds a full 
`HoodieWriteConfig` via `FlinkWriteClients.getHoodieClientConfig(conf)` every 
time. The previous `getMinFileGroupCountForPartitionedRLI()` was just a 
`conf.getString`/parse. For a job that ingests many records into new data 
partitions, this could noticeably regress throughput — could you cache the 
estimated value at the partitioner instance level (it doesn't depend on 
`partitionPath`)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java:
##########
@@ -93,6 +94,12 @@ public static int computeNumShardsAssigned(int taskIndex, 
int numPartitions, int
   static int getNumFileGroupsForRecordIndexPartition(Configuration conf) {
     String tablePath = conf.get(FlinkOptions.PATH);
     HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    // For flink adaptive batch execution, writer coordinator is not started 
yet, so metadata table
+    // is not initialized for a new table.
+    if 
(!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
 {
+      // Get the minimum file group count used to initialize global record 
level index

Review Comment:
   🤖 nit: the comment says "Get the minimum file group count" but 
`estimateFileGroupCountForRLI` actually computes an estimate based on min, max, 
growth factor, and max file-group size — it won't necessarily equal the 
minimum. Something like "Estimate the initial file group count since the RLI 
partition hasn't been initialized yet" would be more accurate.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java:
##########
@@ -124,6 +127,15 @@ public Map<String, HoodieRecordGlobalLocation> 
get(List<String> recordKeys) thro
     return keysAndLocations;
   }
 
+  private HoodiePairData<String, HoodieRecordGlobalLocation> 
lookupLocationsForMissedKeys(List<String> missedKeys) {
+    // For flink adaptive batch execution, writer coordinator is not started 
yet, so metadata table
+    // is not initialized for a new table.
+    if (!tableMetadata.enabled()) {

Review Comment:
   🤖 The new check uses `tableMetadata.enabled()`, which returns 
`isMetadataTableInitialized` derived from `isMetadataTableAvailable()` — and 
that one only checks the FILES partition. If MDT exists (FILES partition is 
available) but the RECORD_INDEX partition isn't initialized yet, this falls 
through to `readRecordIndexLocationsWithKeys`, which has a 
`ValidationUtils.checkState(...isMetadataPartitionAvailable(RECORD_INDEX), 
...)`. `GlobalRecordIndexPartitioner` uses a stricter check 
(`isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX)`). Could 
there be a window (e.g. existing table with MDT but RLI initializing) where the 
partitioner returns the estimate but the backend then trips that checkState? If 
RECORD_INDEX is guaranteed to be initialized together with FILES for global-RLI 
tables, a comment confirming that would help.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -3738,6 +3740,18 @@ private static Stream<Arguments> 
indexAndPartitioningParams() {
     return Stream.of(data).map(Arguments::of);
   }
 
+  /**
+   * Return test params => (index type, boolean).
+   */
+  private static Stream<Arguments> indexAndBooleanParams() {

Review Comment:
   🤖 nit: `indexAndBooleanParams` doesn't say what the boolean represents. 
Could you rename this to `indexAndBootstrapEnabledParams` (and update the 
Javadoc) so the test parameterization is self-explanatory?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java:
##########
@@ -160,11 +158,11 @@ private Processor initRecordProcessor() {
         public void process(List<HoodieFlinkInternalRow> records, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
           List<String> recordKeys = 
records.stream().map(HoodieFlinkInternalRow::getRecordKey).collect(Collectors.toList());
           MinibatchIndexBackend minibatchIndexBackend = 
(MinibatchIndexBackend) delegateFunction.getIndexBackend();
-          // get record locations by minibatch
-          Map<String, HoodieRecordGlobalLocation> recordLocations = 
minibatchIndexBackend.get(recordKeys);
+          // warm up the in-memory cache for record level index
+          minibatchIndexBackend.get(recordKeys);

Review Comment:
   🤖 nit: calling `get()` and discarding the return value to warm up a cache is 
a surprising pattern — `get` signals "give me the value", not "preload a 
side-channel cache". Have you considered adding a dedicated 
`prefetch(List<String>)` or `warmUp(List<String>)` method to 
`MinibatchIndexBackend` so the intent is explicit at the call site?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to