hudi-agent commented on code in PR #18685:
URL: https://github.com/apache/hudi/pull/18685#discussion_r3196558038
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java:
##########
@@ -54,11 +55,22 @@ public class RLIBootstrapOperator
private transient HoodieBackedTableMetadata metadataTable;
private transient long loadedCnt;
+ private transient long numFileSlicesProcessed;
+ private transient long bootstrapCostMs;
+ private transient FlinkRLIBootstrapMetrics metrics;
public RLIBootstrapOperator(Configuration conf) {
super(conf);
}
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.metrics = new
FlinkRLIBootstrapMetrics(getRuntimeContext().getMetricGroup());
+ this.metrics.registerMetrics();
Review Comment:
🤖 Worth a re-check — the diff also calls
`this.metrics.updateLoadResult(numFileSlicesProcessed, loadedCnt,
bootstrapCostMs)` at the end of `initializeState()` (right after
`preLoadRLIRecords()` returns), plus periodic updates every 1000 records inside
`emitIndexRecord`. So the gauges aren't stuck at zero. The bigger concern
flagged elsewhere is the lifecycle ordering: `metrics` is created in `open()`
but the `initializeState()` call uses it — and Flink runs `initializeState()`
first.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkRLIBootstrapMetrics.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Metrics for {@link org.apache.hudi.sink.bootstrap.RLIBootstrapOperator}.
+ *
+ * <p>Exposes gauges that are updated once after the bootstrap loading
completes.
+ */
+public class FlinkRLIBootstrapMetrics extends HoodieFlinkMetrics {
+
+ public static final String NUM_FILE_SLICES_PROCESSED =
"rliBootstrap.numFileSlicesProcessed";
+ public static final String NUM_INDEX_RECORDS_EMITTED =
"rliBootstrap.numIndexRecordsEmitted";
+ public static final String BOOTSTRAP_COST_MS =
"rliBootstrap.bootstrapCostMs";
+ public static final String BOOTSTRAP_THROUGHPUT =
"rliBootstrap.bootstrapThroughput";
+
+ private long numFileSlicesProcessed;
Review Comment:
🤖 FYI, in the actual diff `getThroughput()` returns `numIndexRecordsEmitted
/ bootstrapCostMs` — there is no `* 1000`, so the `RECORD_PER_MS` constant name
is consistent with the math. The mismatch is in the test, where
`testThroughputIsRecordsPerSecond` asserts `4.0` for 2000/500ms (which is
records/ms, not records/sec). The test name/comment are what's misleading, not
the production code.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java:
##########
@@ -54,11 +55,22 @@ public class RLIBootstrapOperator
private transient HoodieBackedTableMetadata metadataTable;
private transient long loadedCnt;
+ private transient long numFileSlicesProcessed;
+ private transient long bootstrapCostMs;
+ private transient FlinkRLIBootstrapMetrics metrics;
public RLIBootstrapOperator(Configuration conf) {
super(conf);
}
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.metrics = new
FlinkRLIBootstrapMetrics(getRuntimeContext().getMetricGroup());
+ this.metrics.registerMetrics();
+ this.metrics.updateLoadResult(numFileSlicesProcessed, loadedCnt,
bootstrapCostMs);
Review Comment:
🤖 The PR does add `TestRLIBootstrapOperator.java` with operator-level tests
(lifecycle via `operator.open()`, metrics field initialization, the 1000-record
update cadence, etc.). One gap that's still worth pointing out though: the test
`setUp` calls only `open()`, not `initializeState()`, so it wouldn't catch the
lifecycle ordering issue (Flink calls `initializeState()` before `open()`, but
`metrics` is created in `open()`).
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkRLIBootstrapMetrics.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Metrics for {@link org.apache.hudi.sink.bootstrap.RLIBootstrapOperator}.
+ *
+ * <p>Exposes gauges that are updated once after the bootstrap loading
completes.
+ */
+public class FlinkRLIBootstrapMetrics extends HoodieFlinkMetrics {
+
+ public static final String NUM_FILE_SLICES_PROCESSED =
"rliBootstrap.numFileSlicesProcessed";
+ public static final String NUM_INDEX_RECORDS_EMITTED =
"rliBootstrap.numIndexRecordsEmitted";
+ public static final String BOOTSTRAP_COST_MS =
"rliBootstrap.bootstrapCostMs";
+ public static final String BOOTSTRAP_RECORD_PER_MS =
"rliBootstrap.bootstrapRecordPerMs";
Review Comment:
🤖 Looking at the actual diff, `getThroughput()` returns
`numIndexRecordsEmitted / bootstrapCostMs` with no `* 1000`, so the exported
value is records/ms and matches the `RECORD_PER_MS` name. The contradiction is
in the test (method name says "RecordsPerSecond" but asserts a records/ms
value), not the production metric name.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java:
##########
@@ -70,6 +82,7 @@ public void initializeState(StateInitializationContext
context) throws Exception
conf.get(FlinkOptions.PATH));
// Load RLI records
preLoadRLIRecords();
+ this.metrics.updateLoadResult(numFileSlicesProcessed, loadedCnt,
bootstrapCostMs);
Review Comment:
🤖 Agreed — `StreamOperator#initializeState()` runs before `open()` in
Flink's lifecycle, so the call at line 85 will NPE on `this.metrics`. The
current `TestRLIBootstrapOperator` only calls `operator.open()` in `setUp`,
which is why the bug isn't caught there. Initializing `metrics` in
`initializeState()` (or guarding the call) seems like the right fix.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkRLIBootstrapMetrics.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Metrics for {@link org.apache.hudi.sink.bootstrap.RLIBootstrapOperator}.
+ *
+ * <p>Exposes gauges that are updated once after the bootstrap loading
completes.
+ */
+public class FlinkRLIBootstrapMetrics extends HoodieFlinkMetrics {
+
+ public static final String NUM_FILE_SLICES_PROCESSED =
"rliBootstrap.numFileSlicesProcessed";
+ public static final String NUM_INDEX_RECORDS_EMITTED =
"rliBootstrap.numIndexRecordsEmitted";
+ public static final String BOOTSTRAP_COST_MS =
"rliBootstrap.bootstrapCostMs";
+ public static final String BOOTSTRAP_RECORD_PER_MS =
"rliBootstrap.bootstrapRecordPerMs";
Review Comment:
🤖 Just to clarify: the diff's `getThroughput()` is `(double)
numIndexRecordsEmitted / bootstrapCostMs` — no `* 1000`. So the constant name
`RECORD_PER_MS` and the computation are actually consistent (records/ms). The
test method name `testThroughputIsRecordsPerSecond` is what disagrees with the
math.
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkRLIBootstrapMetrics.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.hudi.metrics.FlinkRLIBootstrapMetrics.BOOTSTRAP_COST_MS;
+import static
org.apache.hudi.metrics.FlinkRLIBootstrapMetrics.BOOTSTRAP_RECORD_PER_MS;
+import static
org.apache.hudi.metrics.FlinkRLIBootstrapMetrics.NUM_FILE_SLICES_PROCESSED;
+import static
org.apache.hudi.metrics.FlinkRLIBootstrapMetrics.NUM_INDEX_RECORDS_EMITTED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/**
+ * Test cases for {@link FlinkRLIBootstrapMetrics}.
+ */
+class TestFlinkRLIBootstrapMetrics {
+
+ /** Subclass that captures registered gauges so tests can read their values.
*/
+ private static class CapturingMetricGroup extends UnregisteredMetricsGroup {
+ final Map<String, Gauge<?>> gauges = new HashMap<>();
+
+ @Override
+ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+ gauges.put(name, gauge);
+ return gauge;
+ }
+ }
+
+ private CapturingMetricGroup metricGroup;
+ private FlinkRLIBootstrapMetrics metrics;
+
+ @BeforeEach
+ void setUp() {
+ metricGroup = new CapturingMetricGroup();
+ metrics = new FlinkRLIBootstrapMetrics(metricGroup);
+ metrics.registerMetrics();
+ }
+
+ // -------------------------------------------------------------------------
+ // Metric name constants
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testMetricNameConstants() {
+ assertEquals("rliBootstrap.numFileSlicesProcessed",
NUM_FILE_SLICES_PROCESSED);
+ assertEquals("rliBootstrap.numIndexRecordsEmitted",
NUM_INDEX_RECORDS_EMITTED);
+ assertEquals("rliBootstrap.bootstrapCostMs", BOOTSTRAP_COST_MS);
+ assertEquals("rliBootstrap.bootstrapRecordPerMs", BOOTSTRAP_RECORD_PER_MS);
+ }
+
+ // -------------------------------------------------------------------------
+ // Gauge registration
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testAllMetricsAreRegistered() {
+ assertEquals(4, metricGroup.gauges.size());
+ assertEquals(true,
metricGroup.gauges.containsKey(NUM_FILE_SLICES_PROCESSED));
+ assertEquals(true,
metricGroup.gauges.containsKey(NUM_INDEX_RECORDS_EMITTED));
+ assertEquals(true, metricGroup.gauges.containsKey(BOOTSTRAP_COST_MS));
+ assertEquals(true,
metricGroup.gauges.containsKey(BOOTSTRAP_RECORD_PER_MS));
+ }
+
+ @Test
+ void testRegisterMetricsWithUnregisteredGroupDoesNotThrow() {
+ assertDoesNotThrow(() ->
+ new FlinkRLIBootstrapMetrics(new
UnregisteredMetricsGroup()).registerMetrics());
+ }
+
+ // -------------------------------------------------------------------------
+ // Initial values (before any update)
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testInitialValuesAreZero() {
+ assertEquals(0L, (Long) gaugeValue(NUM_FILE_SLICES_PROCESSED));
+ assertEquals(0L, (Long) gaugeValue(NUM_INDEX_RECORDS_EMITTED));
+ assertEquals(0L, (Long) gaugeValue(BOOTSTRAP_COST_MS));
+ assertEquals(0.0, gaugeValue(BOOTSTRAP_RECORD_PER_MS));
+ }
+
+ // -------------------------------------------------------------------------
+ // After updateLoadResult
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testUpdateLoadResultReflectsInGauges() {
+ metrics.updateLoadResult(8, 1000, 500);
+
+ assertEquals(8L, (Long) gaugeValue(NUM_FILE_SLICES_PROCESSED));
+ assertEquals(1000L, (Long) gaugeValue(NUM_INDEX_RECORDS_EMITTED));
+ assertEquals(500L, (Long) gaugeValue(BOOTSTRAP_COST_MS));
+ }
+
+ @Test
+ void testThroughputIsRecordsPerSecond() {
Review Comment:
🤖 Confirming this read: `getThroughput()` is `numIndexRecordsEmitted /
bootstrapCostMs` (no `* 1000`), so 2000/500 = 4 is records-per-ms, matching the
`BOOTSTRAP_RECORD_PER_MS` constant. Renaming the test to
`testThroughputIsRecordsPerMs` (or aligning the production name + math to
records/sec, if that's the intended unit) would resolve the ambiguity.
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkRLIBootstrapMetrics.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.hudi.configuration.OptionsInference;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
Review Comment:
🤖 I think this is a misread of the diff — the test imports
`FlinkRLIBootstrapMetrics.BOOTSTRAP_RECORD_PER_MS` (matching the source), not
`BOOTSTRAP_THROUGHPUT`, and `getThroughput()` does not multiply by 1000. The
`4.0` assertion for 2000/500ms is consistent with records-per-ms. The actual
inconsistency is the test method name (`testThroughputIsRecordsPerSecond`) vs.
the records-per-ms semantics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]