This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit bca4828bc08006769547549bf4e540dc35f89eed Author: StreamingFlames <[email protected]> AuthorDate: Thu Sep 7 08:24:58 2023 +0800 [HUDI-2141] Support flink compaction metrics (#9515) --- .../hudi/metrics/FlinkCompactionMetrics.java | 106 ++++++++++++++++++++ .../org/apache/hudi/metrics/FlinkWriteMetrics.java | 111 +++++++++++++++++++++ .../apache/hudi/metrics/HoodieFlinkMetrics.java | 23 +++++ .../apache/hudi/sink/compact/CompactOperator.java | 16 +++ .../hudi/sink/compact/CompactionCommitSink.java | 16 +++ .../hudi/sink/compact/CompactionPlanOperator.java | 19 +++- .../hudi/sink/utils/CompactFunctionWrapper.java | 11 +- 7 files changed, 298 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkCompactionMetrics.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkCompactionMetrics.java new file mode 100644 index 00000000000..abf7ef05a3f --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkCompactionMetrics.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics; + +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.sink.compact.CompactOperator; +import org.apache.hudi.sink.compact.CompactionPlanOperator; + +import org.apache.flink.metrics.MetricGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.time.Duration; +import java.time.Instant; + +/** + * Metrics for flink compaction. + */ +public class FlinkCompactionMetrics extends FlinkWriteMetrics { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkCompactionMetrics.class); + + /** + * Key for compaction timer. + */ + private static final String COMPACTION_KEY = "compaction"; + + /** + * Number of pending compaction instants. + * + * @see CompactionPlanOperator + */ + private int pendingCompactionCount; + + /** + * Duration between the earliest pending compaction instant time and now in seconds. + * + * @see CompactionPlanOperator + */ + private long compactionDelay; + + /** + * Cost for consuming a compaction operation in milliseconds. + * + * @see CompactOperator + */ + private long compactionCost; + + public FlinkCompactionMetrics(MetricGroup metricGroup) { + super(metricGroup, HoodieTimeline.COMPACTION_ACTION); + } + + @Override + public void registerMetrics() { + super.registerMetrics(); + metricGroup.gauge(getMetricsName(actionType, "pendingCompactionCount"), () -> pendingCompactionCount); + metricGroup.gauge(getMetricsName(actionType, "compactionDelay"), () -> compactionDelay); + metricGroup.gauge(getMetricsName(actionType, "compactionCost"), () -> compactionCost); + } + + public void setPendingCompactionCount(int pendingCompactionCount) { + this.pendingCompactionCount = pendingCompactionCount; + } + + public void setFirstPendingCompactionInstant(Option<HoodieInstant> firstPendingCompactionInstant) { + try { + if (!firstPendingCompactionInstant.isPresent()) { + this.compactionDelay = 0L; + } else { + Instant start = HoodieInstantTimeGenerator.parseDateFromInstantTime(firstPendingCompactionInstant.get().getTimestamp()).toInstant(); + this.compactionDelay = Duration.between(start, Instant.now()).getSeconds(); + } + } catch (ParseException e) { + LOG.warn("Invalid input compaction instant" + firstPendingCompactionInstant); + } + } + + public void startCompaction() { + startTimer(COMPACTION_KEY); + } + + public void endCompaction() { + this.compactionCost = stopTimer(COMPACTION_KEY); + } + +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkWriteMetrics.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkWriteMetrics.java new file mode 100644 index 00000000000..b19f8ef32d9 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkWriteMetrics.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics; + +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; + +import org.apache.flink.metrics.MetricGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; + +/** + * Common flink write commit metadata metrics. + */ +public class FlinkWriteMetrics extends HoodieFlinkMetrics { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkWriteMetrics.class); + + protected final String actionType; + + private long totalPartitionsWritten; + private long totalFilesInsert; + private long totalFilesUpdate; + private long totalRecordsWritten; + private long totalUpdateRecordsWritten; + private long totalInsertRecordsWritten; + private long totalBytesWritten; + private long totalScanTime; + private long totalCompactedRecordsUpdated; + private long totalLogFilesCompacted; + private long totalLogFilesSize; + private long commitEpochTimeInMs; + private long durationInMs; + + public FlinkWriteMetrics(MetricGroup metricGroup, String actionType) { + super(metricGroup); + this.actionType = actionType; + } + + @Override + public void registerMetrics() { + // register commit gauge + metricGroup.gauge(getMetricsName(actionType, "totalPartitionsWritten"), () -> totalPartitionsWritten); + metricGroup.gauge(getMetricsName(actionType, "totalFilesInsert"), () -> totalFilesInsert); + metricGroup.gauge(getMetricsName(actionType, "totalFilesUpdate"), () -> totalFilesUpdate); + metricGroup.gauge(getMetricsName(actionType, "totalRecordsWritten"), () -> totalRecordsWritten); + metricGroup.gauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), () -> totalUpdateRecordsWritten); + metricGroup.gauge(getMetricsName(actionType, "totalInsertRecordsWritten"), () -> totalInsertRecordsWritten); + metricGroup.gauge(getMetricsName(actionType, "totalBytesWritten"), () -> totalBytesWritten); + metricGroup.gauge(getMetricsName(actionType, "totalScanTime"), () -> totalScanTime); + metricGroup.gauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), () -> totalCompactedRecordsUpdated); + metricGroup.gauge(getMetricsName(actionType, "totalLogFilesCompacted"), () -> totalLogFilesCompacted); + metricGroup.gauge(getMetricsName(actionType, "totalLogFilesSize"), () -> totalLogFilesSize); + metricGroup.gauge(getMetricsName(actionType, "commitTime"), () -> commitEpochTimeInMs); + metricGroup.gauge(getMetricsName(actionType, "duration"), () -> durationInMs); + } + + public void updateCommitMetrics(String instantTime, HoodieCommitMetadata metadata) { + long commitEpochTimeInMs; + try { + commitEpochTimeInMs = HoodieInstantTimeGenerator.parseDateFromInstantTime(instantTime).getTime(); + } catch (ParseException e) { + LOG.warn("Invalid input issued instant: " + instantTime); + return; + } + updateCommitMetrics(commitEpochTimeInMs, System.currentTimeMillis() - commitEpochTimeInMs, metadata); + } + + public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata) { + updateCommitTimingMetrics(commitEpochTimeInMs, durationInMs); + totalPartitionsWritten = metadata.fetchTotalPartitionsWritten(); + totalFilesInsert = metadata.fetchTotalFilesInsert(); + totalFilesUpdate = metadata.fetchTotalFilesUpdated(); + totalRecordsWritten = metadata.fetchTotalRecordsWritten(); + totalUpdateRecordsWritten = metadata.fetchTotalUpdateRecordsWritten(); + totalInsertRecordsWritten = metadata.fetchTotalInsertRecordsWritten(); + totalBytesWritten = metadata.fetchTotalBytesWritten(); + totalScanTime = metadata.getTotalScanTime(); + totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated(); + totalLogFilesCompacted = metadata.getTotalLogFilesCompacted(); + totalLogFilesSize = metadata.getTotalLogFilesSize(); + } + + private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationInMs) { + this.commitEpochTimeInMs = commitEpochTimeInMs; + this.durationInMs = durationInMs; + } + + protected String getMetricsName(String action, String metric) { + return String.format("%s.%s", action, metric); + } + +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java index a143010f278..ce58f35402a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java @@ -22,18 +22,41 @@ import org.apache.flink.metrics.MetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + /** * Base class for flink read/write metrics. */ public abstract class HoodieFlinkMetrics { + private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkMetrics.class); + protected Map<String, Long> timers; protected final MetricGroup metricGroup; protected HoodieFlinkMetrics(MetricGroup metricGroup) { + this.timers = new HashMap<>(); this.metricGroup = metricGroup; } public abstract void registerMetrics(); + protected void startTimer(String name) { + if (timers.containsKey(name)) { + LOG.warn("Restarting timer for name: {}, override the value", name); + } + timers.put(name, System.currentTimeMillis()); + } + + protected long stopTimer(String name) { + if (!timers.containsKey(name)) { + LOG.warn("Cannot found name {} in timer, potentially caused by inconsistent call", name); + return 0; + } + long costs = System.currentTimeMillis() - timers.get(name); + timers.remove(name); + return costs; + } + } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java index 66743264457..fc034fcfc80 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.metrics.FlinkCompactionMetrics; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; @@ -33,6 +34,7 @@ import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -85,6 +87,11 @@ public class CompactOperator extends TableStreamOperator<CompactionCommitEvent> */ private transient StreamRecordCollector<CompactionCommitEvent> collector; + /** + * Compaction metrics. + */ + private transient FlinkCompactionMetrics compactionMetrics; + public CompactOperator(Configuration conf) { this.conf = conf; this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf); @@ -103,6 +110,7 @@ public class CompactOperator extends TableStreamOperator<CompactionCommitEvent> this.executor = NonThrownExecutor.builder(LOG).build(); } this.collector = new StreamRecordCollector<>(output); + registerMetrics(); } @Override @@ -127,6 +135,7 @@ public class CompactOperator extends TableStreamOperator<CompactionCommitEvent> CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector, HoodieWriteConfig writeConfig) throws IOException { + compactionMetrics.startCompaction(); HoodieFlinkMergeOnReadTableCompactor<?> compactor = new HoodieFlinkMergeOnReadTableCompactor<>(); HoodieTableMetaClient metaClient = writeClient.getHoodieTable().getMetaClient(); String maxInstantTime = compactor.getMaxInstantTime(metaClient); @@ -140,6 +149,7 @@ public class CompactOperator extends TableStreamOperator<CompactionCommitEvent> compactionOperation, instantTime, maxInstantTime, writeClient.getHoodieTable().getTaskContextSupplier()); + compactionMetrics.endCompaction(); collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID)); } @@ -164,4 +174,10 @@ public class CompactOperator extends TableStreamOperator<CompactionCommitEvent> this.writeClient = null; } } + + private void registerMetrics() { + MetricGroup metrics = getRuntimeContext().getMetricGroup(); + compactionMetrics = new FlinkCompactionMetrics(metrics); + compactionMetrics.registerMetrics(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 828aa3c4265..192b5f5a397 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metrics.FlinkCompactionMetrics; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.action.compact.CompactHelpers; @@ -33,6 +34,7 @@ import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +84,11 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { */ private transient HoodieFlinkTable<?> table; + /** + * Compaction metrics. + */ + private transient FlinkCompactionMetrics compactionMetrics; + public CompactionCommitSink(Configuration conf) { super(conf); this.conf = conf; @@ -96,6 +103,7 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { this.commitBuffer = new HashMap<>(); this.compactionPlanCache = new HashMap<>(); this.table = this.writeClient.getHoodieTable(); + registerMetrics(); } @Override @@ -174,6 +182,8 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { // commit the compaction this.writeClient.commitCompaction(instant, metadata, Option.empty()); + this.compactionMetrics.updateCommitMetrics(instant, metadata); + // Whether to clean up the old log file when compaction if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { this.writeClient.clean(); @@ -184,4 +194,10 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { this.commitBuffer.remove(instant); this.compactionPlanCache.remove(instant); } + + private void registerMetrics() { + MetricGroup metrics = getRuntimeContext().getMetricGroup(); + compactionMetrics = new FlinkCompactionMetrics(metrics); + compactionMetrics.registerMetrics(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index d7446c9bfab..bb4ee0a34ac 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.metrics.FlinkCompactionMetrics; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.util.CompactionUtil; @@ -31,6 +32,7 @@ import org.apache.hudi.util.FlinkTables; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -61,6 +63,8 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla @SuppressWarnings("rawtypes") private transient HoodieFlinkTable table; + private transient FlinkCompactionMetrics compactionMetrics; + public CompactionPlanOperator(Configuration conf) { this.conf = conf; } @@ -73,6 +77,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla // these instants are in priority for scheduling task because the compaction instants are // scheduled from earliest(FIFO sequence). CompactionUtil.rollbackCompaction(table); + registerMetrics(); } @Override @@ -100,9 +105,15 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla } private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException { + HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + // the first instant takes the highest priority. - Option<HoodieInstant> firstRequested = table.getActiveTimeline().filterPendingCompactionTimeline() + Option<HoodieInstant> firstRequested = pendingCompactionTimeline .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant(); + // record metrics + compactionMetrics.setFirstPendingCompactionInstant(firstRequested); + compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants()); + if (!firstRequested.isPresent()) { // do nothing. LOG.info("No compaction plan for checkpoint " + checkpointId); @@ -148,4 +159,10 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla // Called when the input data ends, only used in batch mode. notifyCheckpointComplete(-1); } + + private void registerMetrics() { + MetricGroup metrics = getRuntimeContext().getMetricGroup(); + compactionMetrics = new FlinkCompactionMetrics(metrics); + compactionMetrics.registerMetrics(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java index 78a8305c9c5..b042139aee4 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java @@ -55,6 +55,10 @@ public class CompactFunctionWrapper { * Function that generates the {@link HoodieCompactionPlan}. */ private CompactionPlanOperator compactionPlanOperator; + /** + * Output to collect the compaction plan events. + */ + private CollectorOutput<CompactionPlanEvent> planEventOutput; /** * Output to collect the compaction commit events. */ @@ -83,6 +87,8 @@ public class CompactFunctionWrapper { public void openFunction() throws Exception { compactionPlanOperator = new CompactionPlanOperator(conf); + planEventOutput = new CollectorOutput<>(); + compactionPlanOperator.setup(streamTask, streamConfig, planEventOutput); compactionPlanOperator.open(); compactOperator = new CompactOperator(conf); @@ -102,11 +108,10 @@ public class CompactFunctionWrapper { public void compact(long checkpointID) throws Exception { // collect the CompactEvents. - CollectorOutput<CompactionPlanEvent> output = new CollectorOutput<>(); - compactionPlanOperator.setOutput(output); + compactionPlanOperator.setOutput(planEventOutput); compactionPlanOperator.notifyCheckpointComplete(checkpointID); // collect the CompactCommitEvents - for (CompactionPlanEvent event : output.getRecords()) { + for (CompactionPlanEvent event : planEventOutput.getRecords()) { compactOperator.processElement(new StreamRecord<>(event)); } // handle and commit the compaction
