hudi-agent commented on code in PR #18690:
URL: https://github.com/apache/hudi/pull/18690#discussion_r3189729475


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkMetricsUtils.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+
+/**
+ * Utils for Flink metrics registration.
+ */
+public class FlinkMetricsUtils {
+
+  public static void registerMetadataTableMetrics(HoodieBackedTableMetadata 
metadataTable, MetricGroup metricGroup) {
+    if (metadataTable == null) {
+      return;
+    }
+
+    // metrics is Option.empty() when metrics are disabled
+    metadataTable.getMetrics().ifPresent(m -> {
+      m.registry().getGauges().forEach((name, gauge) ->
+          metricGroup.gauge(name, gauge::getValue));
+      m.registry().getCounters().forEach((name, counter) ->
+          metricGroup.gauge(name, counter::getCount));
+      m.registry().getMeters().forEach((name, meter) -> {
+        metricGroup.gauge(name + ".count", meter::getCount);
+        metricGroup.gauge(name + ".meanRate", meter::getMeanRate);
+        metricGroup.gauge(name + ".1minRate", meter::getOneMinuteRate);
+        metricGroup.gauge(name + ".5minRate", meter::getFiveMinuteRate);
+        metricGroup.gauge(name + ".15minRate", meter::getFifteenMinuteRate);
+      });
+      m.registry().getHistograms().forEach((name, histogram) -> {
+        metricGroup.gauge(name + ".count", histogram::getCount);
+        metricGroup.gauge(name + ".mean", () -> 
histogram.getSnapshot().getMean());
+        metricGroup.gauge(name + ".min", () -> 
histogram.getSnapshot().getMin());
+        metricGroup.gauge(name + ".max", () -> 
histogram.getSnapshot().getMax());
+        metricGroup.gauge(name + ".p75", () -> 
histogram.getSnapshot().get75thPercentile());
+        metricGroup.gauge(name + ".p95", () -> 
histogram.getSnapshot().get95thPercentile());
+        metricGroup.gauge(name + ".p99", () -> 
histogram.getSnapshot().get99thPercentile());
+      });
+      m.registry().getTimers().forEach((name, timer) -> {

Review Comment:
   🤖 nit: the five rate-gauge lines here (count, meanRate, 1minRate, 5minRate, 
15minRate) are identical to the meter block at line 40. Both `Meter` and 
`Timer` implement Dropwizard's `Metered` interface, so could you extract a 
small private helper like `registerRateGauges(String name, Metered metered, 
MetricGroup group)` to avoid the duplication? Makes it easier to add/remove a 
rate stat in one place.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java:
##########
@@ -62,12 +63,17 @@ public RLIBootstrapOperator(Configuration conf) {
   @Override
   public void initializeState(StateInitializationContext context) throws 
Exception {
     loadedCnt = 0;
-    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
-    this.metadataTable = (HoodieBackedTableMetadata) 
metaClient.getTableFormat().getMetadataFactory().create(
-        HoodieFlinkEngineContext.DEFAULT,
-        metaClient.getStorage(),
-        StreamerUtil.metadataConfig(conf),
-        conf.get(FlinkOptions.PATH));
+    HoodieTableMetaClient metaClient = createMetaClient();
+    this.metadataTable = createMetadataTable(metaClient);
+
+    if (metadataTable.getMetadataConfig().isMetricsEnabled()) {
+      // Bridge metadata table Dropwizard metrics to Flink's metric system so 
they are
+      // visible in Flink dashboards and reporters.
+      MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+      metadataTable.getMetrics().ifPresent(m ->
+              m.registry().getGauges().forEach((name, gauge) ->
+                      metricGroup.gauge(name, gauge::getValue)));
+    }

Review Comment:
   🤖 Just to clarify on the current diff — 
`FlinkMetricsUtils.registerMetadataTableMetrics` already iterates all five 
Dropwizard metric types (`getGauges`, `getCounters`, `getMeters`, 
`getHistograms`, `getTimers`), so future metric types added to 
`HoodieMetadataMetrics` should be picked up automatically as long as they fall 
in those buckets. This concern may have been based on an earlier snapshot of 
the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to