This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 566e22b0a3d [HUDI-2141] Support flink stream write metrics (#9118)
566e22b0a3d is described below
commit 566e22b0a3d7c3e67686122b732dbbc4ecdf0025
Author: StreamingFlames <[email protected]>
AuthorDate: Mon Oct 16 21:55:54 2023 -0500
[HUDI-2141] Support flink stream write metrics (#9118)
---
.../hudi/metrics/FlinkStreamWriteMetrics.java | 170 +++++++++++++++++++++
.../org/apache/hudi/sink/StreamWriteFunction.java | 27 +++-
.../hudi/sink/append/AppendWriteFunction.java | 24 ++-
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 42 ++++-
4 files changed, 255 insertions(+), 8 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
new file mode 100644
index 00000000000..a3d84b234f9
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+
+import com.codahale.metrics.SlidingWindowReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Metrics for flink stream write (including append write, normal/bucket
stream write etc.).
+ * Used in subclasses of {@link AbstractStreamWriteFunction}.
+ */
+public class FlinkStreamWriteMetrics extends HoodieFlinkMetrics {
+ private static final String DATA_FLUSH_KEY = "data_flush";
+ private static final String FILE_FLUSH_KEY = "file_flush";
+ private static final String HANDLE_CREATION_KEY = "handle_creation";
+
+ /**
+ * Flush data costs during checkpoint.
+ */
+ private long dataFlushCosts;
+
+ /**
+ * Number of records written in during a checkpoint window.
+ */
+ protected long writtenRecords;
+
+ /**
+ * Current write buffer size in StreamWriteFunction.
+ */
+ private long writeBufferedSize;
+
+ /**
+ * Total costs for flushing files during a checkpoint window.
+ */
+ private long fileFlushTotalCosts;
+
+ /**
+ * Number of handles opened during a checkpoint window. Increased with
partition number/bucket number etc.
+ */
+ private long numOfOpenHandle;
+
+ /**
+ * Number of files written during a checkpoint window.
+ */
+ private long numOfFilesWritten;
+
+ /**
+ * Number of records written per seconds.
+ */
+ protected final Meter recordWrittenPerSecond;
+
+ /**
+ * Number of write handle switches per seconds.
+ */
+ private final Meter handleSwitchPerSecond;
+
+ /**
+ * Cost of write handle creation.
+ */
+ private final Histogram handleCreationCosts;
+
+ /**
+ * Cost of a file flush.
+ */
+ private final Histogram fileFlushCost;
+
+ public FlinkStreamWriteMetrics(MetricGroup metricGroup) {
+ super(metricGroup);
+ this.recordWrittenPerSecond = new DropwizardMeterWrapper(new
com.codahale.metrics.Meter());
+ this.handleSwitchPerSecond = new DropwizardMeterWrapper(new
com.codahale.metrics.Meter());
+ this.handleCreationCosts = new DropwizardHistogramWrapper(new
com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+ this.fileFlushCost = new DropwizardHistogramWrapper(new
com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+ }
+
+ @Override
+ public void registerMetrics() {
+ metricGroup.meter("recordWrittenPerSecond", recordWrittenPerSecond);
+ metricGroup.gauge("currentCommitWrittenRecords", () -> writtenRecords);
+ metricGroup.gauge("dataFlushCosts", () -> dataFlushCosts);
+ metricGroup.gauge("writeBufferedSize", () -> writeBufferedSize);
+
+ metricGroup.gauge("fileFlushTotalCosts", () -> fileFlushTotalCosts);
+ metricGroup.gauge("numOfFilesWritten", () -> numOfFilesWritten);
+ metricGroup.gauge("numOfOpenHandle", () -> numOfOpenHandle);
+
+ metricGroup.meter("handleSwitchPerSecond", handleSwitchPerSecond);
+
+ metricGroup.histogram("handleCreationCosts", handleCreationCosts);
+ metricGroup.histogram("fileFlushCost", fileFlushCost);
+ }
+
+ public void setWriteBufferedSize(long writeBufferedSize) {
+ this.writeBufferedSize = writeBufferedSize;
+ }
+
+ public void startDataFlush() {
+ startTimer(DATA_FLUSH_KEY);
+ }
+
+ public void endDataFlush() {
+ this.dataFlushCosts = stopTimer(DATA_FLUSH_KEY);
+ }
+
+ public void markRecordIn() {
+ this.writtenRecords += 1;
+ recordWrittenPerSecond.markEvent();
+ }
+
+ public void increaseNumOfFilesWritten() {
+ numOfFilesWritten += 1;
+ }
+
+ public void increaseNumOfOpenHandle() {
+ numOfOpenHandle += 1;
+ increaseNumOfFilesWritten();
+ }
+
+ public void markHandleSwitch() {
+ handleSwitchPerSecond.markEvent();
+ }
+
+ public void startHandleCreation() {
+ startTimer(HANDLE_CREATION_KEY);
+ }
+
+ public void endHandleCreation() {
+ handleCreationCosts.update(stopTimer(HANDLE_CREATION_KEY));
+ }
+
+ public void startFileFlush() {
+ startTimer(FILE_FLUSH_KEY);
+ }
+
+ public void endFileFlush() {
+ long costs = stopTimer(FILE_FLUSH_KEY);
+ fileFlushCost.update(costs);
+ this.fileFlushTotalCosts += costs;
+ }
+
+ public void resetAfterCommit() {
+ this.writtenRecords = 0;
+ this.numOfFilesWritten = 0;
+ this.numOfOpenHandle = 0;
+ this.writeBufferedSize = 0;
+ this.fileFlushTotalCosts = 0;
+ }
+
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index bb2dfc61bd3..f203756867f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
@@ -39,6 +40,7 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
@@ -112,6 +114,11 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
*/
private transient TotalSizeTracer tracer;
+ /**
+ * Metrics for flink stream write.
+ */
+ protected transient FlinkStreamWriteMetrics writeMetrics;
+
/**
* Constructs a StreamingSinkFunction.
*
@@ -127,6 +134,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
initBuffer();
initWriteFunction();
initMergeClass();
+ registerMetrics();
}
@Override
@@ -385,6 +393,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
* @param value HoodieRecord
*/
protected void bufferRecord(HoodieRecord<?> value) {
+ writeMetrics.markRecordIn();
final String bucketID = getBucketID(value);
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
@@ -395,6 +404,8 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
+ // update buffer metrics after tracing buffer size
+ writeMetrics.setWriteBufferedSize(this.tracer.bufferSize);
if (flushBucket) {
if (flushBucket(bucket)) {
this.tracer.countDown(bucket.detector.totalSize);
@@ -449,6 +460,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean endInput) {
+ writeMetrics.startDataFlush();
this.currentInstant = instantToWrite(hasData());
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
@@ -488,11 +500,24 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
this.writeStatuses.addAll(writeStatus);
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
+
+ writeMetrics.endDataFlush();
+ writeMetrics.resetAfterCommit();
+ }
+
+ private void registerMetrics() {
+ MetricGroup metrics = getRuntimeContext().getMetricGroup();
+ writeMetrics = new FlinkStreamWriteMetrics(metrics);
+ writeMetrics.registerMetrics();
}
protected List<WriteStatus> writeBucket(String instant, DataBucket bucket,
List<HoodieRecord> records) {
bucket.preWrite(records);
- return writeFunction.apply(records, instant);
+ writeMetrics.startFileFlush();
+ List<WriteStatus> statuses = writeFunction.apply(records, instant);
+ writeMetrics.endFileFlush();
+ writeMetrics.increaseNumOfFilesWritten();
+ return statuses;
}
private List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord>
records) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 91c59341109..121b1d31f31 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -19,7 +19,9 @@
package org.apache.hudi.sink.append;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
@@ -27,6 +29,7 @@ import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
@@ -60,6 +63,11 @@ public class AppendWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
*/
private final RowType rowType;
+ /**
+ * Metrics for flink stream write.
+ */
+ private FlinkStreamWriteMetrics writeMetrics;
+
/**
* Constructs an AppendWriteFunction.
*
@@ -70,6 +78,11 @@ public class AppendWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
this.rowType = rowType;
}
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ registerMetrics();
+ }
+
@Override
public void snapshotState() {
// Based on the fact that the coordinator starts the checkpoint first,
@@ -129,10 +142,11 @@ public class AppendWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
}
this.writerHelper = new BulkInsertWriterHelper(this.config,
this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
instant, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getAttemptNumber(),
- this.rowType);
+ this.rowType, false, Option.of(writeMetrics));
}
private void flushData(boolean endInput) {
+ writeMetrics.startDataFlush();
final List<WriteStatus> writeStatus;
if (this.writerHelper != null) {
writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
@@ -155,5 +169,13 @@ public class AppendWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
this.writeStatuses.addAll(writeStatus);
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
+ writeMetrics.endDataFlush();
+ writeMetrics.resetAfterCommit();
+ }
+
+ private void registerMetrics() {
+ MetricGroup metrics = getRuntimeContext().getMetricGroup();
+ writeMetrics = new FlinkStreamWriteMetrics(metrics);
+ writeMetrics.registerMetrics();
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 5ac0a122bbc..dc0c27d64d2 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -20,11 +20,13 @@ package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
import org.apache.hudi.table.HoodieTable;
import org.apache.flink.configuration.Configuration;
@@ -69,14 +71,21 @@ public class BulkInsertWriterHelper {
@Nullable
protected final RowDataKeyGen keyGen;
+ protected final Option<FlinkStreamWriteMetrics> writeMetrics;
+
public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long
totalSubtaskNum, long taskEpochId, RowType rowType) {
- this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId,
totalSubtaskNum, taskEpochId, rowType, false);
+ this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId,
totalSubtaskNum, taskEpochId, rowType, false, Option.empty());
+ }
+
+ public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
+ String instantTime, int taskPartitionId, long
taskId, long taskEpochId, RowType rowType, boolean preserveHoodieMetadata) {
+ this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId,
taskEpochId, rowType, preserveHoodieMetadata, Option.empty());
}
public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long
totalSubtaskNum, long taskEpochId, RowType rowType,
- boolean preserveHoodieMetadata) {
+ boolean preserveHoodieMetadata,
Option<FlinkStreamWriteMetrics> writeMetrics) {
this.hoodieTable = hoodieTable;
this.writeConfig = writeConfig;
this.instantTime = instantTime;
@@ -88,6 +97,7 @@ public class BulkInsertWriterHelper {
this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) &&
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
this.fileIdPrefix = UUID.randomUUID().toString();
this.keyGen = preserveHoodieMetadata ? null :
RowDataKeyGens.instance(conf, rowType, taskPartitionId, instantTime);
+ this.writeMetrics = writeMetrics;
}
/**
@@ -109,8 +119,10 @@ public class BulkInsertWriterHelper {
if ((lastKnownPartitionPath == null) ||
!lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
handle = getRowCreateHandle(partitionPath);
lastKnownPartitionPath = partitionPath;
+ writeMetrics.ifPresent(FlinkStreamWriteMetrics::markHandleSwitch);
}
handle.write(recordKey, partitionPath, record);
+ writeMetrics.ifPresent(FlinkStreamWriteMetrics::markRecordIn);
} catch (Throwable t) {
IOException ioException = new IOException("Exception happened when bulk
insert.", t);
LOG.error("Global error thrown while trying to write records in
HoodieRowCreateHandle ", ioException);
@@ -126,17 +138,20 @@ public class BulkInsertWriterHelper {
}
LOG.info("Creating new file for partition path " + partitionPath);
+ writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextFileId(),
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata);
handles.put(partitionPath, rowCreateHandle);
+
+ writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfOpenHandle);
} else if (!handles.get(partitionPath).canWrite()) {
// even if there is a handle to the partition path, it could have
reached its max size threshold. So, we close the handle here and
// create a new one.
LOG.info("Rolling max-size file for partition path " + partitionPath);
- writeStatusList.add(handles.remove(partitionPath).close());
- HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextFileId(),
- instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata);
+ writeStatusList.add(closeWriteHandle(handles.remove(partitionPath)));
+ HoodieRowDataCreateHandle rowCreateHandle =
createWriteHandle(partitionPath);
handles.put(partitionPath, rowCreateHandle);
+
writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfFilesWritten);
}
return handles.get(partitionPath);
}
@@ -144,7 +159,7 @@ public class BulkInsertWriterHelper {
public void close() throws IOException {
for (HoodieRowDataCreateHandle rowCreateHandle : handles.values()) {
LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
- writeStatusList.add(rowCreateHandle.close());
+ writeStatusList.add(closeWriteHandle(rowCreateHandle));
}
handles.clear();
handle = null;
@@ -198,5 +213,20 @@ public class BulkInsertWriterHelper {
}
}
+ private HoodieRowDataCreateHandle createWriteHandle(String partitionPath) {
+ writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
+ HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextFileId(),
+ instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata);
+ writeMetrics.ifPresent(FlinkStreamWriteMetrics::endHandleCreation);
+ return rowCreateHandle;
+ }
+
+ private WriteStatus closeWriteHandle(HoodieRowDataCreateHandle
rowCreateHandle) throws IOException {
+ writeMetrics.ifPresent(FlinkStreamWriteMetrics::startFileFlush);
+ WriteStatus status = rowCreateHandle.close();
+ writeMetrics.ifPresent(FlinkStreamWriteMetrics::endFileFlush);
+ return status;
+ }
+
}