This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 566e22b0a3d [HUDI-2141] Support flink stream write metrics (#9118)
566e22b0a3d is described below

commit 566e22b0a3d7c3e67686122b732dbbc4ecdf0025
Author: StreamingFlames <[email protected]>
AuthorDate: Mon Oct 16 21:55:54 2023 -0500

    [HUDI-2141] Support flink stream write metrics (#9118)
---
 .../hudi/metrics/FlinkStreamWriteMetrics.java      | 170 +++++++++++++++++++++
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  27 +++-
 .../hudi/sink/append/AppendWriteFunction.java      |  24 ++-
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     |  42 ++++-
 4 files changed, 255 insertions(+), 8 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
new file mode 100644
index 00000000000..a3d84b234f9
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+
+import com.codahale.metrics.SlidingWindowReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Metrics for flink stream write (including append write, normal/bucket 
stream write etc.).
+ * Used in subclasses of {@link AbstractStreamWriteFunction}.
+ */
+public class FlinkStreamWriteMetrics extends HoodieFlinkMetrics {
+  private static final String DATA_FLUSH_KEY = "data_flush";
+  private static final String FILE_FLUSH_KEY = "file_flush";
+  private static final String HANDLE_CREATION_KEY = "handle_creation";
+
+  /**
+   * Flush data costs during checkpoint.
+   */
+  private long dataFlushCosts;
+
+  /**
+   * Number of records written in during a checkpoint window.
+   */
+  protected long writtenRecords;
+
+  /**
+   * Current write buffer size in StreamWriteFunction.
+   */
+  private long writeBufferedSize;
+
+  /**
+   * Total costs for flushing files during a checkpoint window.
+   */
+  private long fileFlushTotalCosts;
+
+  /**
+   * Number of handles opened during a checkpoint window. Increased with 
partition number/bucket number etc.
+   */
+  private long numOfOpenHandle;
+
+  /**
+   * Number of files written during a checkpoint window.
+   */
+  private long numOfFilesWritten;
+
+  /**
+   * Number of records written per seconds.
+   */
+  protected final Meter recordWrittenPerSecond;
+
+  /**
+   * Number of write handle switches per seconds.
+   */
+  private final Meter handleSwitchPerSecond;
+
+  /**
+   * Cost of write handle creation.
+   */
+  private final Histogram handleCreationCosts;
+
+  /**
+   * Cost of a file flush.
+   */
+  private final Histogram fileFlushCost;
+
+  public FlinkStreamWriteMetrics(MetricGroup metricGroup) {
+    super(metricGroup);
+    this.recordWrittenPerSecond = new DropwizardMeterWrapper(new 
com.codahale.metrics.Meter());
+    this.handleSwitchPerSecond = new DropwizardMeterWrapper(new 
com.codahale.metrics.Meter());
+    this.handleCreationCosts = new DropwizardHistogramWrapper(new 
com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+    this.fileFlushCost = new DropwizardHistogramWrapper(new 
com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+  }
+
+  @Override
+  public void registerMetrics() {
+    metricGroup.meter("recordWrittenPerSecond", recordWrittenPerSecond);
+    metricGroup.gauge("currentCommitWrittenRecords", () -> writtenRecords);
+    metricGroup.gauge("dataFlushCosts", () -> dataFlushCosts);
+    metricGroup.gauge("writeBufferedSize", () -> writeBufferedSize);
+
+    metricGroup.gauge("fileFlushTotalCosts", () -> fileFlushTotalCosts);
+    metricGroup.gauge("numOfFilesWritten", () -> numOfFilesWritten);
+    metricGroup.gauge("numOfOpenHandle", () -> numOfOpenHandle);
+
+    metricGroup.meter("handleSwitchPerSecond", handleSwitchPerSecond);
+
+    metricGroup.histogram("handleCreationCosts", handleCreationCosts);
+    metricGroup.histogram("fileFlushCost", fileFlushCost);
+  }
+
+  public void setWriteBufferedSize(long writeBufferedSize) {
+    this.writeBufferedSize = writeBufferedSize;
+  }
+
+  public void startDataFlush() {
+    startTimer(DATA_FLUSH_KEY);
+  }
+
+  public void endDataFlush() {
+    this.dataFlushCosts = stopTimer(DATA_FLUSH_KEY);
+  }
+
+  public void markRecordIn() {
+    this.writtenRecords += 1;
+    recordWrittenPerSecond.markEvent();
+  }
+
+  public void increaseNumOfFilesWritten() {
+    numOfFilesWritten += 1;
+  }
+
+  public void increaseNumOfOpenHandle() {
+    numOfOpenHandle += 1;
+    increaseNumOfFilesWritten();
+  }
+
+  public void markHandleSwitch() {
+    handleSwitchPerSecond.markEvent();
+  }
+
+  public void startHandleCreation() {
+    startTimer(HANDLE_CREATION_KEY);
+  }
+
+  public void endHandleCreation() {
+    handleCreationCosts.update(stopTimer(HANDLE_CREATION_KEY));
+  }
+
+  public void startFileFlush() {
+    startTimer(FILE_FLUSH_KEY);
+  }
+
+  public void endFileFlush() {
+    long costs = stopTimer(FILE_FLUSH_KEY);
+    fileFlushCost.update(costs);
+    this.fileFlushTotalCosts += costs;
+  }
+
+  public void resetAfterCommit() {
+    this.writtenRecords = 0;
+    this.numOfFilesWritten = 0;
+    this.numOfOpenHandle = 0;
+    this.writeBufferedSize = 0;
+    this.fileFlushTotalCosts = 0;
+  }
+
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index bb2dfc61bd3..f203756867f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
 import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.table.action.commit.FlinkWriteHelper;
@@ -39,6 +40,7 @@ import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
@@ -112,6 +114,11 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
    */
   private transient TotalSizeTracer tracer;
 
+  /**
+   * Metrics for flink stream write.
+   */
+  protected transient FlinkStreamWriteMetrics writeMetrics;
+
   /**
    * Constructs a StreamingSinkFunction.
    *
@@ -127,6 +134,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     initBuffer();
     initWriteFunction();
     initMergeClass();
+    registerMetrics();
   }
 
   @Override
@@ -385,6 +393,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
    * @param value HoodieRecord
    */
   protected void bufferRecord(HoodieRecord<?> value) {
+    writeMetrics.markRecordIn();
     final String bucketID = getBucketID(value);
 
     DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
@@ -395,6 +404,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
 
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
+    // update buffer metrics after tracing buffer size
+    writeMetrics.setWriteBufferedSize(this.tracer.bufferSize);
     if (flushBucket) {
       if (flushBucket(bucket)) {
         this.tracer.countDown(bucket.detector.totalSize);
@@ -449,6 +460,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
 
   @SuppressWarnings("unchecked, rawtypes")
   private void flushRemaining(boolean endInput) {
+    writeMetrics.startDataFlush();
     this.currentInstant = instantToWrite(hasData());
     if (this.currentInstant == null) {
       // in case there are empty checkpoints that has no input data
@@ -488,11 +500,24 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     this.writeStatuses.addAll(writeStatus);
     // blocks flushing until the coordinator starts a new instant
     this.confirming = true;
+
+    writeMetrics.endDataFlush();
+    writeMetrics.resetAfterCommit();
+  }
+
+  private void registerMetrics() {
+    MetricGroup metrics = getRuntimeContext().getMetricGroup();
+    writeMetrics = new FlinkStreamWriteMetrics(metrics);
+    writeMetrics.registerMetrics();
   }
 
   protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, 
List<HoodieRecord> records) {
     bucket.preWrite(records);
-    return writeFunction.apply(records, instant);
+    writeMetrics.startFileFlush();
+    List<WriteStatus> statuses = writeFunction.apply(records, instant);
+    writeMetrics.endFileFlush();
+    writeMetrics.increaseNumOfFilesWritten();
+    return statuses;
   }
 
   private List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord> 
records) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 91c59341109..121b1d31f31 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -19,7 +19,9 @@
 package org.apache.hudi.sink.append;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
 import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
@@ -27,6 +29,7 @@ import org.apache.hudi.sink.event.WriteMetadataEvent;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
@@ -60,6 +63,11 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
    */
   private final RowType rowType;
 
+  /**
+   * Metrics for flink stream write.
+   */
+  private FlinkStreamWriteMetrics writeMetrics;
+
   /**
    * Constructs an AppendWriteFunction.
    *
@@ -70,6 +78,11 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     this.rowType = rowType;
   }
 
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    registerMetrics();
+  }
+
   @Override
   public void snapshotState() {
     // Based on the fact that the coordinator starts the checkpoint first,
@@ -129,10 +142,11 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     }
     this.writerHelper = new BulkInsertWriterHelper(this.config, 
this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
         instant, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
-        this.rowType);
+        this.rowType, false, Option.of(writeMetrics));
   }
 
   private void flushData(boolean endInput) {
+    writeMetrics.startDataFlush();
     final List<WriteStatus> writeStatus;
     if (this.writerHelper != null) {
       writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
@@ -155,5 +169,13 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     this.writeStatuses.addAll(writeStatus);
     // blocks flushing until the coordinator starts a new instant
     this.confirming = true;
+    writeMetrics.endDataFlush();
+    writeMetrics.resetAfterCommit();
+  }
+
+  private void registerMetrics() {
+    MetricGroup metrics = getRuntimeContext().getMetricGroup();
+    writeMetrics = new FlinkStreamWriteMetrics(metrics);
+    writeMetrics.registerMetrics();
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 5ac0a122bbc..dc0c27d64d2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -20,11 +20,13 @@ package org.apache.hudi.sink.bulk;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.flink.configuration.Configuration;
@@ -69,14 +71,21 @@ public class BulkInsertWriterHelper {
   @Nullable
   protected final RowDataKeyGen keyGen;
 
+  protected final Option<FlinkStreamWriteMetrics> writeMetrics;
+
   public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
                                 String instantTime, int taskPartitionId, long 
totalSubtaskNum, long taskEpochId, RowType rowType) {
-    this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, 
totalSubtaskNum, taskEpochId, rowType, false);
+    this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, 
totalSubtaskNum, taskEpochId, rowType, false, Option.empty());
+  }
+
+  public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
+                                String instantTime, int taskPartitionId, long 
taskId, long taskEpochId, RowType rowType, boolean preserveHoodieMetadata) {
+    this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, 
taskEpochId, rowType, preserveHoodieMetadata, Option.empty());
   }
 
   public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
                                 String instantTime, int taskPartitionId, long 
totalSubtaskNum, long taskEpochId, RowType rowType,
-                                boolean preserveHoodieMetadata) {
+                                boolean preserveHoodieMetadata, 
Option<FlinkStreamWriteMetrics> writeMetrics) {
     this.hoodieTable = hoodieTable;
     this.writeConfig = writeConfig;
     this.instantTime = instantTime;
@@ -88,6 +97,7 @@ public class BulkInsertWriterHelper {
     this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) && 
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
     this.fileIdPrefix = UUID.randomUUID().toString();
     this.keyGen = preserveHoodieMetadata ? null : 
RowDataKeyGens.instance(conf, rowType, taskPartitionId, instantTime);
+    this.writeMetrics = writeMetrics;
   }
 
   /**
@@ -109,8 +119,10 @@ public class BulkInsertWriterHelper {
       if ((lastKnownPartitionPath == null) || 
!lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
         handle = getRowCreateHandle(partitionPath);
         lastKnownPartitionPath = partitionPath;
+        writeMetrics.ifPresent(FlinkStreamWriteMetrics::markHandleSwitch);
       }
       handle.write(recordKey, partitionPath, record);
+      writeMetrics.ifPresent(FlinkStreamWriteMetrics::markRecordIn);
     } catch (Throwable t) {
       IOException ioException = new IOException("Exception happened when bulk 
insert.", t);
       LOG.error("Global error thrown while trying to write records in 
HoodieRowCreateHandle ", ioException);
@@ -126,17 +138,20 @@ public class BulkInsertWriterHelper {
       }
 
       LOG.info("Creating new file for partition path " + partitionPath);
+      writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
       HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextFileId(),
           instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata);
       handles.put(partitionPath, rowCreateHandle);
+
+      writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfOpenHandle);
     } else if (!handles.get(partitionPath).canWrite()) {
       // even if there is a handle to the partition path, it could have 
reached its max size threshold. So, we close the handle here and
       // create a new one.
       LOG.info("Rolling max-size file for partition path " + partitionPath);
-      writeStatusList.add(handles.remove(partitionPath).close());
-      HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextFileId(),
-          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata);
+      writeStatusList.add(closeWriteHandle(handles.remove(partitionPath)));
+      HoodieRowDataCreateHandle rowCreateHandle = 
createWriteHandle(partitionPath);
       handles.put(partitionPath, rowCreateHandle);
+      
writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfFilesWritten);
     }
     return handles.get(partitionPath);
   }
@@ -144,7 +159,7 @@ public class BulkInsertWriterHelper {
   public void close() throws IOException {
     for (HoodieRowDataCreateHandle rowCreateHandle : handles.values()) {
       LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
-      writeStatusList.add(rowCreateHandle.close());
+      writeStatusList.add(closeWriteHandle(rowCreateHandle));
     }
     handles.clear();
     handle = null;
@@ -198,5 +213,20 @@ public class BulkInsertWriterHelper {
     }
   }
 
+  private HoodieRowDataCreateHandle createWriteHandle(String  partitionPath) {
+    writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
+    HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextFileId(),
+        instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata);
+    writeMetrics.ifPresent(FlinkStreamWriteMetrics::endHandleCreation);
+    return rowCreateHandle;
+  }
+
+  private WriteStatus closeWriteHandle(HoodieRowDataCreateHandle 
rowCreateHandle) throws IOException {
+    writeMetrics.ifPresent(FlinkStreamWriteMetrics::startFileFlush);
+    WriteStatus status = rowCreateHandle.close();
+    writeMetrics.ifPresent(FlinkStreamWriteMetrics::endFileFlush);
+    return status;
+  }
+
 }
 

Reply via email to