hudi-agent commented on code in PR #18685:
URL: https://github.com/apache/hudi/pull/18685#discussion_r3198089240


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkRLIBootstrapMetrics.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.hudi.metrics.FlinkRLIBootstrapMetrics.BOOTSTRAP_COST_MS;
+import static 
org.apache.hudi.metrics.FlinkRLIBootstrapMetrics.BOOTSTRAP_RECORD_PER_MS;
+import static 
org.apache.hudi.metrics.FlinkRLIBootstrapMetrics.NUM_FILE_SLICES_PROCESSED;
+import static 
org.apache.hudi.metrics.FlinkRLIBootstrapMetrics.NUM_INDEX_RECORDS_EMITTED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/**
+ * Test cases for {@link FlinkRLIBootstrapMetrics}.
+ */
+class TestFlinkRLIBootstrapMetrics {
+
+  /** Subclass that captures registered gauges so tests can read their values. 
*/
+  private static class CapturingMetricGroup extends UnregisteredMetricsGroup {
+    final Map<String, Gauge<?>> gauges = new HashMap<>();
+
+    @Override
+    public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+      gauges.put(name, gauge);
+      return gauge;
+    }
+  }
+
+  private CapturingMetricGroup metricGroup;
+  private FlinkRLIBootstrapMetrics metrics;
+
+  @BeforeEach
+  void setUp() {
+    metricGroup = new CapturingMetricGroup();
+    metrics = new FlinkRLIBootstrapMetrics(metricGroup);
+    metrics.registerMetrics();
+  }
+
+  // -------------------------------------------------------------------------
+  //  Metric name constants
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testMetricNameConstants() {
+    assertEquals("rliBootstrap.numFileSlicesProcessed", 
NUM_FILE_SLICES_PROCESSED);
+    assertEquals("rliBootstrap.numIndexRecordsEmitted", 
NUM_INDEX_RECORDS_EMITTED);
+    assertEquals("rliBootstrap.bootstrapCostMs", BOOTSTRAP_COST_MS);
+    assertEquals("rliBootstrap.bootstrapRecordPerMs", BOOTSTRAP_RECORD_PER_MS);
+  }
+
+  // -------------------------------------------------------------------------
+  //  Gauge registration
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testAllMetricsAreRegistered() {
+    assertEquals(4, metricGroup.gauges.size());
+    assertEquals(true, 
metricGroup.gauges.containsKey(NUM_FILE_SLICES_PROCESSED));
+    assertEquals(true, 
metricGroup.gauges.containsKey(NUM_INDEX_RECORDS_EMITTED));
+    assertEquals(true, metricGroup.gauges.containsKey(BOOTSTRAP_COST_MS));
+    assertEquals(true, 
metricGroup.gauges.containsKey(BOOTSTRAP_RECORD_PER_MS));
+  }
+
+  @Test
+  void testRegisterMetricsWithUnregisteredGroupDoesNotThrow() {
+    assertDoesNotThrow(() ->
+        new FlinkRLIBootstrapMetrics(new 
UnregisteredMetricsGroup()).registerMetrics());
+  }
+
+  // -------------------------------------------------------------------------
+  //  Initial values (before any update)
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testInitialValuesAreZero() {
+    assertEquals(0L, (Long) gaugeValue(NUM_FILE_SLICES_PROCESSED));
+    assertEquals(0L, (Long) gaugeValue(NUM_INDEX_RECORDS_EMITTED));
+    assertEquals(0L, (Long) gaugeValue(BOOTSTRAP_COST_MS));
+    assertEquals(0.0, gaugeValue(BOOTSTRAP_RECORD_PER_MS));
+  }
+
+  // -------------------------------------------------------------------------
+  //  After updateLoadResult
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testUpdateLoadResultReflectsInGauges() {
+    metrics.updateLoadResult(8, 1000, 500);
+
+    assertEquals(8L, (Long) gaugeValue(NUM_FILE_SLICES_PROCESSED));
+    assertEquals(1000L, (Long) gaugeValue(NUM_INDEX_RECORDS_EMITTED));
+    assertEquals(500L, (Long) gaugeValue(BOOTSTRAP_COST_MS));
+  }
+
+  @Test
+  void testThroughputIsRecordsPerSecond() {
+    // 2000 records in 500 ms → 4000 records/sec

Review Comment:
   🤖 nit: the test name says `RecordsPerSecond` but the gauge is 
`bootstrapRecordPerMs` (records per millisecond) — could you rename it to 
`testThroughputIsRecordsPerMs` and fix the inline comment on the next line 
(`4.0 records/ms`, not `4000 records/sec`)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java:
##########
@@ -130,6 +143,11 @@ private void emitIndexRecord(String recordKey, 
HoodieRecordGlobalLocation locati
             location.getFileId(),
             String.valueOf(location.getInstantTime()))));
     loadedCnt += 1;
+
+    // update the metrics every 1000 records
+    if (loadedCnt % 1000 == 0) {

Review Comment:
   🤖 nit: the magic number `1000` could be a named constant (e.g., 
`METRICS_UPDATE_INTERVAL`) so the sampling frequency is visible at the 
declaration site rather than buried in `emitIndexRecord`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkRLIBootstrapMetrics.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Metrics for {@link org.apache.hudi.sink.bootstrap.RLIBootstrapOperator}.
+ *
+ * <p>Exposes gauges that are updated once after the bootstrap loading 
completes.
+ */
+public class FlinkRLIBootstrapMetrics extends HoodieFlinkMetrics {
+
+  public static final String NUM_FILE_SLICES_PROCESSED = 
"rliBootstrap.numFileSlicesProcessed";
+  public static final String NUM_INDEX_RECORDS_EMITTED = 
"rliBootstrap.numIndexRecordsEmitted";
+  public static final String BOOTSTRAP_COST_MS = 
"rliBootstrap.bootstrapCostMs";
+  public static final String BOOTSTRAP_RECORD_PER_MS = 
"rliBootstrap.bootstrapRecordPerMs";
+
+  private long numFileSlicesProcessed;
+  private long numIndexRecordsEmitted;
+  private long bootstrapCostMs;
+
+  public FlinkRLIBootstrapMetrics(MetricGroup metricGroup) {
+    super(metricGroup);
+  }
+
+  @Override
+  public void registerMetrics() {
+    metricGroup.gauge(NUM_FILE_SLICES_PROCESSED, (Gauge<Long>) () -> 
numFileSlicesProcessed);
+    metricGroup.gauge(NUM_INDEX_RECORDS_EMITTED, (Gauge<Long>) () -> 
numIndexRecordsEmitted);
+    metricGroup.gauge(BOOTSTRAP_COST_MS, (Gauge<Long>) () -> bootstrapCostMs);
+    metricGroup.gauge(BOOTSTRAP_RECORD_PER_MS, (Gauge<Double>) 
this::getThroughput);
+  }
+
+  public void updateLoadResult(long numFileSlicesProcessed, long 
numIndexRecordsEmitted, long bootstrapCostMs) {
+    this.numFileSlicesProcessed = numFileSlicesProcessed;
+    this.numIndexRecordsEmitted = numIndexRecordsEmitted;
+    this.bootstrapCostMs = bootstrapCostMs;
+  }
+
+  private double getThroughput() {
+    return bootstrapCostMs > 0 ? (double) numIndexRecordsEmitted / 
bootstrapCostMs : 0;

Review Comment:
   🤖 nit: `getThroughput()` doesn't reveal the unit — could you rename it to 
`getRecordsPerMs()` to stay consistent with the gauge key 
`bootstrapRecordPerMs` right above it?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to