This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 478833af968 [HUDI-7266] Add clustering metric for flink (#10420)
478833af968 is described below
commit 478833af96895f8765dcb639c0fdd971779b89b9
Author: leixin <[email protected]>
AuthorDate: Sun Jan 7 16:58:28 2024 +0800
[HUDI-7266] Add clustering metric for flink (#10420)
---
.../hudi/metrics/FlinkClusteringMetrics.java | 105 +++++++++++++++++++++
.../hudi/sink/clustering/ClusteringCommitSink.java | 12 +++
.../hudi/sink/clustering/ClusteringOperator.java | 14 +++
.../sink/clustering/ClusteringPlanOperator.java | 22 ++++-
.../hudi/sink/utils/ClusteringFunctionWrapper.java | 6 ++
5 files changed, 158 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkClusteringMetrics.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkClusteringMetrics.java
new file mode 100644
index 00000000000..081c8f79a73
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkClusteringMetrics.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sink.clustering.ClusteringOperator;
+import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.time.Duration;
+import java.time.Instant;
+
+/**
+ * Metrics for flink clustering.
+ */
+public class FlinkClusteringMetrics extends FlinkWriteMetrics {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkClusteringMetrics.class);
+
+ /**
+ * Key for clustering timer.
+ */
+ private static final String CLUSTERING_KEY = "clustering";
+
+ /**
+ * Number of pending clustering instants.
+ *
+ * @see ClusteringPlanOperator
+ */
+ private long pendingClusteringCount;
+
+ /**
+ * Duration between the earliest pending clustering instant time and now in
seconds.
+ *
+ * @see ClusteringPlanOperator
+ */
+ private long clusteringDelay;
+
+ /**
+ * Cost for consuming a clustering operation in milliseconds.
+ *
+ * @see ClusteringOperator
+ */
+ private long clusteringCost;
+
+ public FlinkClusteringMetrics(MetricGroup metricGroup) {
+ super(metricGroup, CLUSTERING_KEY);
+ }
+
+ @Override
+ public void registerMetrics() {
+ super.registerMetrics();
+ metricGroup.gauge(getMetricsName(actionType, "pendingClusteringCount"), ()
-> pendingClusteringCount);
+ metricGroup.gauge(getMetricsName(actionType, "clusteringDelay"), () ->
clusteringDelay);
+ metricGroup.gauge(getMetricsName(actionType, "clusteringCost"), () ->
clusteringCost);
+ }
+
+ public void setPendingClusteringCount(long pendingClusteringCount) {
+ this.pendingClusteringCount = pendingClusteringCount;
+ }
+
+ public void setFirstPendingClusteringInstant(Option<HoodieInstant>
firstPendingClusteringInstant) {
+ try {
+ if (!firstPendingClusteringInstant.isPresent()) {
+ this.clusteringDelay = 0L;
+ } else {
+ Instant start =
HoodieInstantTimeGenerator.parseDateFromInstantTime((firstPendingClusteringInstant.get()).getTimestamp()).toInstant();
+ this.clusteringDelay = Duration.between(start,
Instant.now()).getSeconds();
+ }
+ } catch (ParseException e) {
+ LOG.warn("Invalid input clustering instant" +
firstPendingClusteringInstant);
+ }
+ }
+
+ public void startClustering() {
+ startTimer(CLUSTERING_KEY);
+ }
+
+ public void endClustering() {
+ this.clusteringCost = stopTimer(CLUSTERING_KEY);
+ }
+
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index 93b6d4fbf95..75f025687e4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkClusteringMetrics;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -42,6 +43,7 @@ import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,6 +90,8 @@ public class ClusteringCommitSink extends
CleanFunction<ClusteringCommitEvent> {
*/
private transient Map<String, HoodieClusteringPlan> clusteringPlanCache;
+ private transient FlinkClusteringMetrics clusteringMetrics;
+
public ClusteringCommitSink(Configuration conf) {
super(conf);
this.conf = conf;
@@ -102,6 +106,7 @@ public class ClusteringCommitSink extends
CleanFunction<ClusteringCommitEvent> {
this.commitBuffer = new HashMap<>();
this.clusteringPlanCache = new HashMap<>();
this.table = writeClient.getHoodieTable();
+ registerMetrics();
}
@Override
@@ -194,6 +199,7 @@ public class ClusteringCommitSink extends
CleanFunction<ClusteringCommitEvent> {
this.writeClient.completeTableService(
TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(),
table, instant,
Option.of(HoodieListData.lazy(writeMetadata.getWriteStatuses())));
+ clusteringMetrics.updateCommitMetrics(instant,
writeMetadata.getCommitMetadata().get());
// whether to clean up the input base parquet files used for clustering
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
LOG.info("Running inline clean");
@@ -229,4 +235,10 @@ public class ClusteringCommitSink extends
CleanFunction<ClusteringCommitEvent> {
.filter(fg -> !newFilesWritten.contains(fg))
.collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath,
Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList())));
}
+
+ private void registerMetrics() {
+ MetricGroup metrics = getRuntimeContext().getMetricGroup();
+ clusteringMetrics = new FlinkClusteringMetrics(metrics);
+ clusteringMetrics.registerMetrics();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 415b1024cfd..6aa5dd9acba 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -44,6 +44,7 @@ import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.metrics.FlinkClusteringMetrics;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.utils.NonThrownExecutor;
@@ -58,6 +59,7 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -127,6 +129,8 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
*/
private transient NonThrownExecutor executor;
+ private transient FlinkClusteringMetrics clusteringMetrics;
+
public ClusteringOperator(Configuration conf, RowType rowType) {
// copy a conf let following modification not to impact the global conf
this.conf = new Configuration(conf);
@@ -170,6 +174,8 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
}
this.collector = new StreamRecordCollector<>(output);
+
+ registerMetrics();
}
@Override
@@ -213,6 +219,7 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
// -------------------------------------------------------------------------
private void doClustering(String instantTime, List<ClusteringOperation>
clusteringOperations) throws Exception {
+ clusteringMetrics.startClustering();
BulkInsertWriterHelper writerHelper = new
BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
instantTime, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getAttemptNumber(),
this.rowType, true);
@@ -247,6 +254,7 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
}
List<WriteStatus> writeStatuses =
writerHelper.getWriteStatuses(this.taskID);
+ clusteringMetrics.endClustering();
collector.collect(new ClusteringCommitEvent(instantTime,
getFileIds(clusteringOperations), writeStatuses, this.taskID));
writerHelper.close();
}
@@ -388,4 +396,10 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
public void setOutput(Output<StreamRecord<ClusteringCommitEvent>> output) {
this.output = output;
}
+
+ private void registerMetrics() {
+ MetricGroup metrics = getRuntimeContext().getMetricGroup();
+ clusteringMetrics = new FlinkClusteringMetrics(metrics);
+ clusteringMetrics.registerMetrics();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java
index 48b2a9becd4..c16f8ed7080 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metrics.FlinkClusteringMetrics;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.FlinkTables;
@@ -33,11 +34,14 @@ import org.apache.hudi.util.FlinkWriteClients;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import java.util.List;
+
/**
* Operator that generates the clustering plan with pluggable strategies on
finished checkpoints.
*
@@ -57,6 +61,8 @@ public class ClusteringPlanOperator extends
AbstractStreamOperator<ClusteringPla
@SuppressWarnings("rawtypes")
private transient HoodieFlinkTable table;
+ private transient FlinkClusteringMetrics clusteringMetrics;
+
public ClusteringPlanOperator(Configuration conf) {
this.conf = conf;
}
@@ -65,6 +71,7 @@ public class ClusteringPlanOperator extends
AbstractStreamOperator<ClusteringPla
public void open() throws Exception {
super.open();
this.table = FlinkTables.createTable(conf, getRuntimeContext());
+ registerMetrics();
// when starting up, rolls back all the inflight clustering instants if
there exists,
// these instants are in priority for scheduling task because the
clustering instants are
// scheduled from earliest(FIFO sequence).
@@ -88,10 +95,17 @@ public class ClusteringPlanOperator extends
AbstractStreamOperator<ClusteringPla
}
private void scheduleClustering(HoodieFlinkTable<?> table, long
checkpointId) {
+ List<HoodieInstant> pendingClusteringInstantTimes =
+
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
// the first instant takes the highest priority.
Option<HoodieInstant> firstRequested = Option.fromJavaOptional(
-
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()).stream()
+ pendingClusteringInstantTimes.stream()
.filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED).findFirst());
+
+ // record metrics
+ clusteringMetrics.setFirstPendingClusteringInstant(firstRequested);
+
clusteringMetrics.setPendingClusteringCount(pendingClusteringInstantTimes.size());
+
if (!firstRequested.isPresent()) {
// do nothing.
LOG.info("No clustering plan for checkpoint " + checkpointId);
@@ -136,4 +150,10 @@ public class ClusteringPlanOperator extends
AbstractStreamOperator<ClusteringPla
public void setOutput(Output<StreamRecord<ClusteringPlanEvent>> output) {
this.output = output;
}
+
+ private void registerMetrics() {
+ MetricGroup metrics = getRuntimeContext().getMetricGroup();
+ clusteringMetrics = new FlinkClusteringMetrics(metrics);
+ clusteringMetrics.registerMetrics();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
index e3b75cbf637..252a4835069 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
@@ -55,6 +55,10 @@ public class ClusteringFunctionWrapper {
* Function that generates the {@code HoodieClusteringPlan}.
*/
private ClusteringPlanOperator clusteringPlanOperator;
+ /**
+ * Output to collect the clustering plan events.
+ */
+ private CollectorOutput<ClusteringPlanEvent> planEventOutput;
/**
* Output to collect the clustering commit events.
*/
@@ -83,6 +87,8 @@ public class ClusteringFunctionWrapper {
public void openFunction() throws Exception {
clusteringPlanOperator = new ClusteringPlanOperator(conf);
+ planEventOutput = new CollectorOutput<>();
+ clusteringPlanOperator.setup(streamTask, streamConfig, planEventOutput);
clusteringPlanOperator.open();
clusteringOperator = new ClusteringOperator(conf,
TestConfigurations.ROW_TYPE);