This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 478833af968 [HUDI-7266] Add clustering metric for flink (#10420)
478833af968 is described below

commit 478833af96895f8765dcb639c0fdd971779b89b9
Author: leixin <[email protected]>
AuthorDate: Sun Jan 7 16:58:28 2024 +0800

    [HUDI-7266] Add clustering metric for flink (#10420)
---
 .../hudi/metrics/FlinkClusteringMetrics.java       | 105 +++++++++++++++++++++
 .../hudi/sink/clustering/ClusteringCommitSink.java |  12 +++
 .../hudi/sink/clustering/ClusteringOperator.java   |  14 +++
 .../sink/clustering/ClusteringPlanOperator.java    |  22 ++++-
 .../hudi/sink/utils/ClusteringFunctionWrapper.java |   6 ++
 5 files changed, 158 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkClusteringMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkClusteringMetrics.java
new file mode 100644
index 00000000000..081c8f79a73
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkClusteringMetrics.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sink.clustering.ClusteringOperator;
+import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.time.Duration;
+import java.time.Instant;
+
+/**
+ * Metrics for flink clustering.
+ */
+public class FlinkClusteringMetrics extends FlinkWriteMetrics {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkClusteringMetrics.class);
+
+  /**
+   * Key for clustering timer.
+   */
+  private static final String CLUSTERING_KEY = "clustering";
+
+  /**
+   * Number of pending clustering instants.
+   *
+   * @see ClusteringPlanOperator
+   */
+  private long pendingClusteringCount;
+
+  /**
+   * Duration between the earliest pending clustering instant time and now in 
seconds.
+   *
+   *  @see ClusteringPlanOperator
+   */
+  private long clusteringDelay;
+
+  /**
+   * Cost for consuming a clustering operation in milliseconds.
+   *
+   * @see ClusteringOperator
+   */
+  private long clusteringCost;
+
+  public FlinkClusteringMetrics(MetricGroup metricGroup) {
+    super(metricGroup, CLUSTERING_KEY);
+  }
+
+  @Override
+  public void registerMetrics() {
+    super.registerMetrics();
+    metricGroup.gauge(getMetricsName(actionType, "pendingClusteringCount"), () 
-> pendingClusteringCount);
+    metricGroup.gauge(getMetricsName(actionType, "clusteringDelay"), () -> 
clusteringDelay);
+    metricGroup.gauge(getMetricsName(actionType, "clusteringCost"), () -> 
clusteringCost);
+  }
+
+  public void setPendingClusteringCount(long pendingClusteringCount) {
+    this.pendingClusteringCount = pendingClusteringCount;
+  }
+
+  public void setFirstPendingClusteringInstant(Option<HoodieInstant> 
firstPendingClusteringInstant) {
+    try {
+      if (!firstPendingClusteringInstant.isPresent()) {
+        this.clusteringDelay = 0L;
+      } else {
+        Instant start = 
HoodieInstantTimeGenerator.parseDateFromInstantTime((firstPendingClusteringInstant.get()).getTimestamp()).toInstant();
+        this.clusteringDelay = Duration.between(start, 
Instant.now()).getSeconds();
+      }
+    } catch (ParseException e) {
+      LOG.warn("Invalid input clustering instant" + 
firstPendingClusteringInstant);
+    }
+  }
+
+  public void startClustering() {
+    startTimer(CLUSTERING_KEY);
+  }
+
+  public void endClustering() {
+    this.clusteringCost = stopTimer(CLUSTERING_KEY);
+  }
+
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index 93b6d4fbf95..75f025687e4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkClusteringMetrics;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -42,6 +43,7 @@ import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.FlinkWriteClients;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,6 +90,8 @@ public class ClusteringCommitSink extends 
CleanFunction<ClusteringCommitEvent> {
    */
   private transient Map<String, HoodieClusteringPlan> clusteringPlanCache;
 
+  private transient FlinkClusteringMetrics clusteringMetrics;
+
   public ClusteringCommitSink(Configuration conf) {
     super(conf);
     this.conf = conf;
@@ -102,6 +106,7 @@ public class ClusteringCommitSink extends 
CleanFunction<ClusteringCommitEvent> {
     this.commitBuffer = new HashMap<>();
     this.clusteringPlanCache = new HashMap<>();
     this.table = writeClient.getHoodieTable();
+    registerMetrics();
   }
 
   @Override
@@ -194,6 +199,7 @@ public class ClusteringCommitSink extends 
CleanFunction<ClusteringCommitEvent> {
     this.writeClient.completeTableService(
         TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), 
table, instant, 
Option.of(HoodieListData.lazy(writeMetadata.getWriteStatuses())));
 
+    clusteringMetrics.updateCommitMetrics(instant, 
writeMetadata.getCommitMetadata().get());
     // whether to clean up the input base parquet files used for clustering
     if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
       LOG.info("Running inline clean");
@@ -229,4 +235,10 @@ public class ClusteringCommitSink extends 
CleanFunction<ClusteringCommitEvent> {
         .filter(fg -> !newFilesWritten.contains(fg))
         .collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, 
Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList())));
   }
+
+  private void registerMetrics() {
+    MetricGroup metrics = getRuntimeContext().getMetricGroup();
+    clusteringMetrics = new FlinkClusteringMetrics(metrics);
+    clusteringMetrics.registerMetrics();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 415b1024cfd..6aa5dd9acba 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -44,6 +44,7 @@ import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.metrics.FlinkClusteringMetrics;
 import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
@@ -58,6 +59,7 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -127,6 +129,8 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
    */
   private transient NonThrownExecutor executor;
 
+  private transient FlinkClusteringMetrics clusteringMetrics;
+
   public ClusteringOperator(Configuration conf, RowType rowType) {
     // copy a conf let following modification not to impact the global conf
     this.conf = new Configuration(conf);
@@ -170,6 +174,8 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     }
 
     this.collector = new StreamRecordCollector<>(output);
+
+    registerMetrics();
   }
 
   @Override
@@ -213,6 +219,7 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
   // -------------------------------------------------------------------------
 
   private void doClustering(String instantTime, List<ClusteringOperation> 
clusteringOperations) throws Exception {
+    clusteringMetrics.startClustering();
     BulkInsertWriterHelper writerHelper = new 
BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
         instantTime, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
         this.rowType, true);
@@ -247,6 +254,7 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     }
 
     List<WriteStatus> writeStatuses = 
writerHelper.getWriteStatuses(this.taskID);
+    clusteringMetrics.endClustering();
     collector.collect(new ClusteringCommitEvent(instantTime, 
getFileIds(clusteringOperations), writeStatuses, this.taskID));
     writerHelper.close();
   }
@@ -388,4 +396,10 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
   public void setOutput(Output<StreamRecord<ClusteringCommitEvent>> output) {
     this.output = output;
   }
+
+  private void registerMetrics() {
+    MetricGroup metrics = getRuntimeContext().getMetricGroup();
+    clusteringMetrics = new FlinkClusteringMetrics(metrics);
+    clusteringMetrics.registerMetrics();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java
index 48b2a9becd4..c16f8ed7080 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metrics.FlinkClusteringMetrics;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.FlinkTables;
@@ -33,11 +34,14 @@ import org.apache.hudi.util.FlinkWriteClients;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.util.List;
+
 /**
  * Operator that generates the clustering plan with pluggable strategies on 
finished checkpoints.
  *
@@ -57,6 +61,8 @@ public class ClusteringPlanOperator extends 
AbstractStreamOperator<ClusteringPla
   @SuppressWarnings("rawtypes")
   private transient HoodieFlinkTable table;
 
+  private transient FlinkClusteringMetrics clusteringMetrics;
+
   public ClusteringPlanOperator(Configuration conf) {
     this.conf = conf;
   }
@@ -65,6 +71,7 @@ public class ClusteringPlanOperator extends 
AbstractStreamOperator<ClusteringPla
   public void open() throws Exception {
     super.open();
     this.table = FlinkTables.createTable(conf, getRuntimeContext());
+    registerMetrics();
     // when starting up, rolls back all the inflight clustering instants if 
there exists,
     // these instants are in priority for scheduling task because the 
clustering instants are
     // scheduled from earliest(FIFO sequence).
@@ -88,10 +95,17 @@ public class ClusteringPlanOperator extends 
AbstractStreamOperator<ClusteringPla
   }
 
   private void scheduleClustering(HoodieFlinkTable<?> table, long 
checkpointId) {
+    List<HoodieInstant> pendingClusteringInstantTimes =
+        
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
     // the first instant takes the highest priority.
     Option<HoodieInstant> firstRequested = Option.fromJavaOptional(
-        
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()).stream()
+        pendingClusteringInstantTimes.stream()
             .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED).findFirst());
+
+    // record metrics
+    clusteringMetrics.setFirstPendingClusteringInstant(firstRequested);
+    
clusteringMetrics.setPendingClusteringCount(pendingClusteringInstantTimes.size());
+
     if (!firstRequested.isPresent()) {
       // do nothing.
       LOG.info("No clustering plan for checkpoint " + checkpointId);
@@ -136,4 +150,10 @@ public class ClusteringPlanOperator extends 
AbstractStreamOperator<ClusteringPla
   public void setOutput(Output<StreamRecord<ClusteringPlanEvent>> output) {
     this.output = output;
   }
+
+  private void registerMetrics() {
+    MetricGroup metrics = getRuntimeContext().getMetricGroup();
+    clusteringMetrics = new FlinkClusteringMetrics(metrics);
+    clusteringMetrics.registerMetrics();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
index e3b75cbf637..252a4835069 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
@@ -55,6 +55,10 @@ public class ClusteringFunctionWrapper {
    * Function that generates the {@code HoodieClusteringPlan}.
    */
   private ClusteringPlanOperator clusteringPlanOperator;
+  /**
+   * Output to collect the clustering plan events.
+   */
+  private CollectorOutput<ClusteringPlanEvent> planEventOutput;
   /**
    * Output to collect the clustering commit events.
    */
@@ -83,6 +87,8 @@ public class ClusteringFunctionWrapper {
 
   public void openFunction() throws Exception {
     clusteringPlanOperator = new ClusteringPlanOperator(conf);
+    planEventOutput =  new CollectorOutput<>();
+    clusteringPlanOperator.setup(streamTask, streamConfig, planEventOutput);
     clusteringPlanOperator.open();
 
     clusteringOperator = new ClusteringOperator(conf, 
TestConfigurations.ROW_TYPE);

Reply via email to