hudi-agent commented on code in PR #18790:
URL: https://github.com/apache/hudi/pull/18790#discussion_r3277564312


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java:
##########
@@ -145,6 +145,26 @@ public void open(Configuration parameters) throws 
Exception {
     this.recordProcessor = initRecordProcessor();
     this.metrics = new 
FlinkBucketAssignMetrics(getRuntimeContext().getMetricGroup());
     this.metrics.registerMetrics();
+    initRliShardAssignMetric();
+  }
+
+  /**
+   * Computes and registers the number of RLI shards assigned to this task 
when global RLI is active.

Review Comment:
   🤖 nit: "registers" is a bit misleading here — the gauge is already 
registered in `registerMetrics()` which runs just before this call. Could you 
change it to something like "Computes and sets the value of the 
numShardsAssigned gauge" so a reader doesn't go hunting for a second 
registration path?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java:
##########
@@ -47,6 +56,11 @@ public FlinkBucketAssignMetrics(MetricGroup metricGroup) {
   @Override
   public void registerMetrics() {
     metricGroup.histogram("recordBufferingTime", recordBufferingTime);
+    metricGroup.gauge("numShardsAssigned", numShardsAssigned::get);
+  }
+
+  public void setNumShardsAssigned(int count) {

Review Comment:
   🤖 nit: the parameter `count` is a bit generic given the method is 
`setNumShardsAssigned` and the field is `numShardsAssigned` — could you rename 
it to `numShards` to keep the naming consistent throughout?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java:
##########
@@ -145,6 +145,26 @@ public void open(Configuration parameters) throws 
Exception {
     this.recordProcessor = initRecordProcessor();
     this.metrics = new 
FlinkBucketAssignMetrics(getRuntimeContext().getMetricGroup());
     this.metrics.registerMetrics();
+    initRliShardAssignMetric();
+  }
+
+  /**
+   * Computes and registers the number of RLI shards assigned to this task 
when global RLI is active.
+   * Each task owns the file groups whose index satisfies {@code fgIndex % 
numPartitions == taskIndex}.
+   */
+  private void initRliShardAssignMetric() {
+    if (!OptionsResolver.isGlobalRecordLevelIndex(conf)) {

Review Comment:
   🤖 In `Pipelines.createBucketAssignStream` (around line 464), 
`GlobalRecordIndexPartitioner` is only wired upstream when 
`isGlobalRecordLevelIndex(conf) && !INDEX_BOOTSTRAP_ENABLED`; the 
bootstrap-with-global-RLI case falls through to the `else` branch and uses 
plain `BucketAssignFunction` with `keyBy(recordKey)` instead. Should this guard 
also exclude `OptionsResolver.isRLIWithBootstrap(conf)` so the gauge only 
reports a value when the partitioner is actually routing records?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to