This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new dc1f0ec  [feature](write)add metrics for flink load data (#287)
dc1f0ec is described below

commit dc1f0ec3a8462c59e121b6e00d0ef165ed5f1b46
Author: Petrichor <[email protected]>
AuthorDate: Fri Jan 5 16:33:42 2024 +0800

    [feature](write)add metrics for flink load data (#287)
---
 .../doris/flink/rest/models/RespContent.java       |  70 ++++-
 .../doris/flink/sink/writer/DorisStreamLoad.java   |   6 +-
 .../doris/flink/sink/writer/DorisWriteMetrics.java | 348 +++++++++++++++++++++
 .../doris/flink/sink/writer/DorisWriter.java       |  33 +-
 .../doris/flink/sink/writer/TestDorisWriter.java   |  37 +++
 5 files changed, 475 insertions(+), 19 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
index 71edbfd..94a1dc4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
@@ -26,7 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 public class RespContent {
 
     @JsonProperty(value = "TxnId")
-    private long txnId;
+    private Long txnId;
 
     @JsonProperty(value = "Label")
     private String label;
@@ -44,42 +44,42 @@ public class RespContent {
     private String message;
 
     @JsonProperty(value = "NumberTotalRows")
-    private long numberTotalRows;
+    private Long numberTotalRows;
 
     @JsonProperty(value = "NumberLoadedRows")
-    private long numberLoadedRows;
+    private Long numberLoadedRows;
 
     @JsonProperty(value = "NumberFilteredRows")
-    private int numberFilteredRows;
+    private Integer numberFilteredRows;
 
     @JsonProperty(value = "NumberUnselectedRows")
-    private int numberUnselectedRows;
+    private Integer numberUnselectedRows;
 
     @JsonProperty(value = "LoadBytes")
-    private long loadBytes;
+    private Long loadBytes;
 
     @JsonProperty(value = "LoadTimeMs")
-    private int loadTimeMs;
+    private Integer loadTimeMs;
 
     @JsonProperty(value = "BeginTxnTimeMs")
-    private int beginTxnTimeMs;
+    private Integer beginTxnTimeMs;
 
     @JsonProperty(value = "StreamLoadPutTimeMs")
-    private int streamLoadPutTimeMs;
+    private Integer streamLoadPutTimeMs;
 
     @JsonProperty(value = "ReadDataTimeMs")
-    private int readDataTimeMs;
+    private Integer readDataTimeMs;
 
     @JsonProperty(value = "WriteDataTimeMs")
-    private int writeDataTimeMs;
+    private Integer writeDataTimeMs;
 
     @JsonProperty(value = "CommitAndPublishTimeMs")
-    private int commitAndPublishTimeMs;
+    private Integer commitAndPublishTimeMs;
 
     @JsonProperty(value = "ErrorURL")
     private String errorURL;
 
-    public long getTxnId() {
+    public Long getTxnId() {
         return txnId;
     }
 
@@ -99,6 +99,50 @@ public class RespContent {
         return existingJobStatus;
     }
 
+    public Long getNumberTotalRows() {
+        return numberTotalRows;
+    }
+
+    public Long getNumberLoadedRows() {
+        return numberLoadedRows;
+    }
+
+    public Integer getNumberFilteredRows() {
+        return numberFilteredRows;
+    }
+
+    public Integer getNumberUnselectedRows() {
+        return numberUnselectedRows;
+    }
+
+    public Long getLoadBytes() {
+        return loadBytes;
+    }
+
+    public Integer getLoadTimeMs() {
+        return loadTimeMs;
+    }
+
+    public Integer getBeginTxnTimeMs() {
+        return beginTxnTimeMs;
+    }
+
+    public Integer getStreamLoadPutTimeMs() {
+        return streamLoadPutTimeMs;
+    }
+
+    public Integer getReadDataTimeMs() {
+        return readDataTimeMs;
+    }
+
+    public Integer getWriteDataTimeMs() {
+        return writeDataTimeMs;
+    }
+
+    public Integer getCommitAndPublishTimeMs() {
+        return commitAndPublishTimeMs;
+    }
+
     @Override
     public String toString() {
         ObjectMapper mapper = new ObjectMapper();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index b095eb9..604eb5c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -155,8 +155,8 @@ public class DorisStreamLoad implements Serializable {
     /**
      * try to discard pending transactions with labels beginning with 
labelSuffix.
      *
-     * @param labelSuffix
-     * @param chkID
+     * @param labelSuffix the suffix of the stream load.
+     * @param chkID checkpoint id of task.
      * @throws Exception
      */
     public void abortPreCommit(String labelSuffix, long chkID) throws 
Exception {
@@ -260,7 +260,7 @@ public class DorisStreamLoad implements Serializable {
     /**
      * start write data for new checkpoint.
      *
-     * @param label
+     * @param label the label of Stream Load.
      * @throws IOException
      */
     public void startLoad(String label, boolean isResume) throws IOException {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriteMetrics.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriteMetrics.java
new file mode 100644
index 0000000..d60d551
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriteMetrics.java
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
+
+import org.apache.doris.flink.rest.models.RespContent;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
+import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
+
+/**
+ * the metrics for Doris Writer.
+ *
+ * @since 1.6
+ */
+public class DorisWriteMetrics implements Serializable {
+    private static final long serialVersionUID = 1L;
+    // Window size of histogram metrics.
+    private static final int HISTOGRAM_WINDOW_SIZE = 100;
+    private static final List<String> DORIS_SUCCESS_STATUS =
+            new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
+    private final String tableIdentifier;
+    AtomicLong totalRows = new AtomicLong();
+    // Counter
+    private transient Counter totalFlushLoadBytes;
+    private transient Counter totalFlushNumberTotalRows;
+    private transient Counter totalFlushLoadedRows;
+    private transient Counter totalFlushTimeMs;
+    private transient Counter totalFlushSucceededTimes;
+    private transient Counter totalFlushFailedTimes;
+
+    private transient Counter totalFlushFilteredRows;
+    private transient Counter totalFlushUnselectedRows;
+    // Histogram
+    private transient Histogram beginTxnTimeHistogramMs;
+    private transient Histogram commitAndPublishTimeHistogramMs;
+    private transient Histogram streamLoadPutTimeHistogramMs;
+    private transient Histogram readDataTimeHistogramMs;
+    private transient Histogram writeDataTimeHistogramMs;
+    private transient Histogram loadTimeHistogramMs;
+
+    private static final String COUNTER_TOTAL_FLUSH_BYTES = 
"totalFlushLoadBytes";
+    private static final String COUNTER_TOTAL_FLUSH_ROWS = 
"flushTotalNumberRows";
+    private static final String COUNTER_TOTAL_FLUSH_LOADED_ROWS = 
"totalFlushLoadedRows";
+    private static final String COUNTER_TOTAL_FLUSH_COST_TIME = 
"totalFlushTimeMs";
+    private static final String COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES_COUNT =
+            "totalFlushSucceededNumber";
+    private static final String COUNTER_TOTAL_FLUSH_FAILED_TIMES_COUNT = 
"totalFlushFailedNumber";
+
+    private static final String COUNTER_TOTAL_FILTERED_ROWS = 
"totalFlushFilteredRows";
+    private static final String COUNTER_TOTAL_UNSELECTED_ROWS = 
"totalFlushUnselectedRows";
+
+    private static final String HISTOGRAM_BEGIN_TXN_TIME_MS = "beginTxnTimeMs";
+    private static final String HISTOGRAM_STREAM_LOAD_PUT_DATA_TIME_MS = 
"putDataTimeMs";
+    private static final String HISTOGRAM_READ_DATA_TIME_MS = "readDataTimeMs";
+    private static final String HISTOGRAM_WRITE_DATA_TIME_MS = 
"writeDataTimeMs";
+    private static final String HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS = 
"commitAndPublishTimeMs";
+    private static final String HISTOGRAM_LOAD_TIME_MS = "loadTimeMs";
+
+    private static final String METRIC_NAME_FORMAT = "%s_%s";
+
+    @VisibleForTesting
+    DorisWriteMetrics(SinkWriterMetricGroup sinkMetricGroup, String 
tableIdentifier) {
+        this.tableIdentifier = tableIdentifier;
+        register(sinkMetricGroup);
+    }
+
+    public static DorisWriteMetrics of(
+            SinkWriterMetricGroup sinkWriterMetricGroup, String 
tableIdentifier) {
+        return new DorisWriteMetrics(sinkWriterMetricGroup, tableIdentifier);
+    }
+
+    public void flush(RespContent respContent) {
+        if (respContent != null && 
DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+            flushSuccessLoad(respContent);
+        } else {
+            flushFailedRecord();
+        }
+    }
+
+    @VisibleForTesting
+    public void register(SinkWriterMetricGroup sinkMetricGroup) {
+        totalFlushNumberTotalRows =
+                sinkMetricGroup.counter(
+                        String.format(
+                                METRIC_NAME_FORMAT, tableIdentifier, 
COUNTER_TOTAL_FLUSH_ROWS));
+        totalFlushLoadedRows =
+                sinkMetricGroup.counter(
+                        String.format(
+                                METRIC_NAME_FORMAT,
+                                tableIdentifier,
+                                COUNTER_TOTAL_FLUSH_LOADED_ROWS));
+        totalFlushLoadBytes =
+                sinkMetricGroup.counter(
+                        String.format(
+                                METRIC_NAME_FORMAT, tableIdentifier, 
COUNTER_TOTAL_FLUSH_BYTES));
+        totalFlushFilteredRows =
+                sinkMetricGroup.counter(
+                        String.format(
+                                METRIC_NAME_FORMAT, tableIdentifier, 
COUNTER_TOTAL_FILTERED_ROWS));
+        totalFlushUnselectedRows =
+                sinkMetricGroup.counter(
+                        String.format(
+                                METRIC_NAME_FORMAT,
+                                tableIdentifier,
+                                COUNTER_TOTAL_UNSELECTED_ROWS));
+        totalFlushSucceededTimes =
+                sinkMetricGroup.counter(
+                        String.format(
+                                METRIC_NAME_FORMAT,
+                                tableIdentifier,
+                                COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES_COUNT));
+        totalFlushFailedTimes =
+                sinkMetricGroup.counter(
+                        String.format(
+                                METRIC_NAME_FORMAT,
+                                tableIdentifier,
+                                COUNTER_TOTAL_FLUSH_FAILED_TIMES_COUNT));
+        totalFlushTimeMs =
+                sinkMetricGroup.counter(
+                        String.format(
+                                METRIC_NAME_FORMAT,
+                                tableIdentifier,
+                                COUNTER_TOTAL_FLUSH_COST_TIME));
+
+        loadTimeHistogramMs =
+                sinkMetricGroup.histogram(
+                        String.format(METRIC_NAME_FORMAT, tableIdentifier, 
HISTOGRAM_LOAD_TIME_MS),
+                        new 
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+        streamLoadPutTimeHistogramMs =
+                sinkMetricGroup.histogram(
+                        String.format(
+                                METRIC_NAME_FORMAT,
+                                tableIdentifier,
+                                HISTOGRAM_STREAM_LOAD_PUT_DATA_TIME_MS),
+                        new 
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+        commitAndPublishTimeHistogramMs =
+                sinkMetricGroup.histogram(
+                        String.format(
+                                METRIC_NAME_FORMAT,
+                                tableIdentifier,
+                                HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS),
+                        new 
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+        this.beginTxnTimeHistogramMs =
+                sinkMetricGroup.histogram(
+                        String.format(
+                                METRIC_NAME_FORMAT, tableIdentifier, 
HISTOGRAM_BEGIN_TXN_TIME_MS),
+                        new 
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+        readDataTimeHistogramMs =
+                sinkMetricGroup.histogram(
+                        String.format(
+                                METRIC_NAME_FORMAT, tableIdentifier, 
HISTOGRAM_READ_DATA_TIME_MS),
+                        new 
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+        writeDataTimeHistogramMs =
+                sinkMetricGroup.histogram(
+                        String.format(
+                                METRIC_NAME_FORMAT, tableIdentifier, 
HISTOGRAM_WRITE_DATA_TIME_MS),
+                        new 
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+    }
+
+    private void flushSuccessLoad(RespContent responseContent) {
+        
Optional.ofNullable(responseContent.getLoadBytes()).ifPresent(totalFlushLoadBytes::inc);
+        Optional.ofNullable(responseContent.getNumberLoadedRows())
+                .ifPresent(totalFlushLoadedRows::inc);
+        Optional.ofNullable(responseContent.getNumberTotalRows())
+                .ifPresent(totalFlushNumberTotalRows::inc);
+        Optional.ofNullable(responseContent.getNumberFilteredRows())
+                .ifPresent(totalFlushFilteredRows::inc);
+        
Optional.ofNullable(responseContent.getLoadTimeMs()).ifPresent(totalFlushTimeMs::inc);
+        Optional.ofNullable(responseContent.getNumberUnselectedRows())
+                .ifPresent(totalFlushUnselectedRows::inc);
+
+        totalFlushSucceededTimes.inc();
+
+        Optional.ofNullable(responseContent.getCommitAndPublishTimeMs())
+                .ifPresent(commitAndPublishTimeHistogramMs::update);
+        Optional.ofNullable(responseContent.getWriteDataTimeMs())
+                .ifPresent(writeDataTimeHistogramMs::update);
+        Optional.ofNullable(responseContent.getBeginTxnTimeMs())
+                .ifPresent(beginTxnTimeHistogramMs::update);
+        Optional.ofNullable(responseContent.getReadDataTimeMs())
+                .ifPresent(readDataTimeHistogramMs::update);
+        Optional.ofNullable(responseContent.getStreamLoadPutTimeMs())
+                .ifPresent(streamLoadPutTimeHistogramMs::update);
+        
Optional.ofNullable(responseContent.getLoadTimeMs()).ifPresent(loadTimeHistogramMs::update);
+    }
+
+    private void flushFailedRecord() {
+        totalFlushFailedTimes.inc();
+    }
+
+    public String getTableIdentifier() {
+        return tableIdentifier;
+    }
+
+    public Counter getTotalFlushLoadBytes() {
+        return totalFlushLoadBytes;
+    }
+
+    public Counter getTotalFlushNumberTotalRows() {
+        return totalFlushNumberTotalRows;
+    }
+
+    public Counter getTotalFlushLoadedRows() {
+        return totalFlushLoadedRows;
+    }
+
+    public Counter getTotalFlushTimeMs() {
+        return totalFlushTimeMs;
+    }
+
+    public Counter getTotalFlushSucceededTimes() {
+        return totalFlushSucceededTimes;
+    }
+
+    public Counter getTotalFlushFailedTimes() {
+        return totalFlushFailedTimes;
+    }
+
+    public Counter getTotalFlushFilteredRows() {
+        return totalFlushFilteredRows;
+    }
+
+    public Counter getTotalFlushUnselectedRows() {
+        return totalFlushUnselectedRows;
+    }
+
+    public Histogram getBeginTxnTimeHistogramMs() {
+        return beginTxnTimeHistogramMs;
+    }
+
+    public Histogram getCommitAndPublishTimeHistogramMs() {
+        return commitAndPublishTimeHistogramMs;
+    }
+
+    public Histogram getStreamLoadPutTimeHistogramMs() {
+        return streamLoadPutTimeHistogramMs;
+    }
+
+    public Histogram getReadDataTimeHistogramMs() {
+        return readDataTimeHistogramMs;
+    }
+
+    public Histogram getWriteDataTimeHistogramMs() {
+        return writeDataTimeHistogramMs;
+    }
+
+    public Histogram getLoadTimeHistogramMs() {
+        return loadTimeHistogramMs;
+    }
+
+    @VisibleForTesting
+    public void setTotalFlushLoadBytes(Counter totalFlushLoadBytes) {
+        this.totalFlushLoadBytes = totalFlushLoadBytes;
+    }
+
+    @VisibleForTesting
+    public void setTotalFlushNumberTotalRows(Counter 
totalFlushNumberTotalRows) {
+        this.totalFlushNumberTotalRows = totalFlushNumberTotalRows;
+    }
+
+    @VisibleForTesting
+    public void setTotalFlushLoadedRows(Counter totalFlushLoadedRows) {
+        this.totalFlushLoadedRows = totalFlushLoadedRows;
+    }
+
+    @VisibleForTesting
+    public void setTotalFlushTimeMs(Counter totalFlushTimeMs) {
+        this.totalFlushTimeMs = totalFlushTimeMs;
+    }
+
+    @VisibleForTesting
+    public void setTotalFlushSucceededTimes(Counter totalFlushSucceededTimes) {
+        this.totalFlushSucceededTimes = totalFlushSucceededTimes;
+    }
+
+    @VisibleForTesting
+    public void setTotalFlushFailedTimes(Counter totalFlushFailedTimes) {
+        this.totalFlushFailedTimes = totalFlushFailedTimes;
+    }
+
+    @VisibleForTesting
+    public void setTotalFlushFilteredRows(Counter totalFlushFilteredRows) {
+        this.totalFlushFilteredRows = totalFlushFilteredRows;
+    }
+
+    @VisibleForTesting
+    public void setTotalFlushUnselectedRows(Counter totalFlushUnselectedRows) {
+        this.totalFlushUnselectedRows = totalFlushUnselectedRows;
+    }
+
+    @VisibleForTesting
+    public void setBeginTxnTimeHistogramMs(Histogram beginTxnTimeHistogramMs) {
+        this.beginTxnTimeHistogramMs = beginTxnTimeHistogramMs;
+    }
+
+    @VisibleForTesting
+    public void setCommitAndPublishTimeHistogramMs(Histogram 
commitAndPublishTimeHistogramMs) {
+        this.commitAndPublishTimeHistogramMs = commitAndPublishTimeHistogramMs;
+    }
+
+    @VisibleForTesting
+    public void setStreamLoadPutTimeHistogramMs(Histogram 
streamLoadPutTimeHistogramMs) {
+        this.streamLoadPutTimeHistogramMs = streamLoadPutTimeHistogramMs;
+    }
+
+    @VisibleForTesting
+    public void setReadDataTimeHistogramMs(Histogram readDataTimeHistogramMs) {
+        this.readDataTimeHistogramMs = readDataTimeHistogramMs;
+    }
+
+    @VisibleForTesting
+    public void setWriteDataTimeHistogramMs(Histogram 
writeDataTimeHistogramMs) {
+        this.writeDataTimeHistogramMs = writeDataTimeHistogramMs;
+    }
+
+    @VisibleForTesting
+    public void setLoadTimeHistogramMs(Histogram loadTimeHistogramMs) {
+        this.loadTimeHistogramMs = loadTimeHistogramMs;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index e8cc1ff..5b8b4fc 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
@@ -82,6 +83,8 @@ public class DorisWriter<IN>
     private transient Thread executorThread;
     private transient volatile Exception loadException = null;
     private BackendUtil backendUtil;
+    private SinkWriterMetricGroup sinkMetricGroup;
+    private Map<String, DorisWriteMetrics> sinkMetricsMap = new 
ConcurrentHashMap<>();
 
     public DorisWriter(
             Sink.InitContext initContext,
@@ -96,7 +99,7 @@ public class DorisWriter<IN>
                         .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
         this.curCheckpointId = lastCheckpointId + 1;
         LOG.info("restore checkpointId {}", lastCheckpointId);
-        LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
+        LOG.info("labelPrefix {}", executionOptions.getLabelPrefix());
         this.labelPrefix = executionOptions.getLabelPrefix();
         this.subtaskId = initContext.getSubtaskId();
         this.scheduledExecutorService =
@@ -107,7 +110,7 @@ public class DorisWriter<IN>
         this.executionOptions = executionOptions;
         this.intervalTime = executionOptions.checkInterval();
         this.globalLoading = false;
-
+        sinkMetricGroup = initContext.metricGroup();
         initializeLoad(state);
         serializer.initial();
     }
@@ -197,10 +200,24 @@ public class DorisWriter<IN>
             streamLoader.startLoad(currentLabel, false);
             loadingMap.put(tableKey, true);
             globalLoading = true;
+            registerMetrics(tableKey);
         }
         streamLoader.writeRecord(record.getRow());
     }
 
+    @VisibleForTesting
+    public void setSinkMetricGroup(SinkWriterMetricGroup sinkMetricGroup) {
+        this.sinkMetricGroup = sinkMetricGroup;
+    }
+
+    public void registerMetrics(String tableKey) {
+        if (sinkMetricsMap.containsKey(tableKey)) {
+            return;
+        }
+        DorisWriteMetrics metrics = DorisWriteMetrics.of(sinkMetricGroup, 
tableKey);
+        sinkMetricsMap.put(tableKey, metrics);
+    }
+
     @Override
     public Collection<DorisCommittable> prepareCommit() throws IOException, 
InterruptedException {
         // Verify whether data is written during a checkpoint
@@ -222,10 +239,15 @@ public class DorisWriter<IN>
             LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
             String currentLabel = 
labelGenerator.generateTableLabel(curCheckpointId);
             RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
+            // refresh metrics
+            if (sinkMetricsMap.containsKey(tableIdentifier)) {
+                DorisWriteMetrics dorisWriteMetrics = 
sinkMetricsMap.get(tableIdentifier);
+                dorisWriteMetrics.flush(respContent);
+            }
             if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                 String errMsg =
                         String.format(
-                                "tabel {} stream load error: %s, see more in 
%s",
+                                "table %s stream load error: %s, see more in 
%s",
                                 tableIdentifier,
                                 respContent.getMessage(),
                                 respContent.getErrorURL());
@@ -363,6 +385,11 @@ public class DorisWriter<IN>
         this.dorisStreamLoadMap = streamLoadMap;
     }
 
+    @VisibleForTesting
+    public void setDorisMetricsMap(Map<String, DorisWriteMetrics> metricsMap) {
+        this.sinkMetricsMap = metricsMap;
+    }
+
     @Override
     public void close() throws Exception {
         LOG.info("Close DorisWriter.");
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 667a3f1..e25bef6 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -18,6 +18,9 @@
 package org.apache.doris.flink.sink.writer;
 
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
@@ -65,6 +68,7 @@ public class TestDorisWriter {
                 HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, 
true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
         Map<String, DorisStreamLoad> dorisStreamLoadMap = new 
ConcurrentHashMap<>();
+        Map<String, DorisWriteMetrics> dorisWriteMetricsMap = new 
ConcurrentHashMap<>();
         DorisStreamLoad dorisStreamLoad =
                 new DorisStreamLoad(
                         "local:8040",
@@ -75,7 +79,11 @@ public class TestDorisWriter {
         dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), 
dorisStreamLoad);
         dorisStreamLoad.startLoad("", false);
         Sink.InitContext initContext = mock(Sink.InitContext.class);
+        SinkWriterMetricGroup sinkWriterMetricGroup = 
mock(SinkWriterMetricGroup.class);
         
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
+        when(initContext.getSubtaskId()).thenReturn(1);
+        DorisWriteMetrics mockWriteMetrics = 
getMockWriteMetrics(sinkWriterMetricGroup);
+        dorisWriteMetricsMap.put(dorisOptions.getTableIdentifier(), 
mockWriteMetrics);
         DorisWriter<String> dorisWriter =
                 new DorisWriter<String>(
                         initContext,
@@ -84,7 +92,9 @@ public class TestDorisWriter {
                         dorisOptions,
                         readOptions,
                         executionOptions);
+        dorisWriter.setSinkMetricGroup(sinkWriterMetricGroup);
         dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap);
+        dorisWriter.setDorisMetricsMap(dorisWriteMetricsMap);
         dorisWriter.write("doris,1", null);
         Collection<DorisCommittable> committableList = 
dorisWriter.prepareCommit();
         Assert.assertEquals(1, committableList.size());
@@ -129,4 +139,31 @@ public class TestDorisWriter {
         Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix());
         Assert.assertTrue(!dorisWriter.isLoading());
     }
+
+    public DorisWriteMetrics getMockWriteMetrics(SinkWriterMetricGroup 
sinkWriterMetricGroup) {
+        DorisWriteMetrics dorisWriteMetrics =
+                new DorisWriteMetrics(sinkWriterMetricGroup, 
dorisOptions.getTableIdentifier());
+        Counter mockCounter = mock(Counter.class);
+        Histogram mockHistogram = mock(Histogram.class);
+        when(mockCounter.getCount()).thenReturn(0L);
+        when(mockHistogram.getCount()).thenReturn(0L);
+        dorisWriteMetrics.setTotalFlushLoadBytes(mockCounter);
+        dorisWriteMetrics.setTotalFlushNumberTotalRows(mockCounter);
+        dorisWriteMetrics.setTotalFlushFilteredRows(mockCounter);
+        dorisWriteMetrics.setTotalFlushLoadedRows(mockCounter);
+        dorisWriteMetrics.setTotalFlushFailedTimes(mockCounter);
+        dorisWriteMetrics.setTotalFlushNumberTotalRows(mockCounter);
+        dorisWriteMetrics.setTotalFlushUnselectedRows(mockCounter);
+        dorisWriteMetrics.setTotalFlushSucceededTimes(mockCounter);
+        dorisWriteMetrics.setTotalFlushTimeMs(mockCounter);
+        dorisWriteMetrics.setBeginTxnTimeHistogramMs(mockHistogram);
+        dorisWriteMetrics.setCommitAndPublishTimeHistogramMs(mockHistogram);
+        dorisWriteMetrics.setWriteDataTimeHistogramMs(mockHistogram);
+        dorisWriteMetrics.setStreamLoadPutTimeHistogramMs(mockHistogram);
+        dorisWriteMetrics.setWriteDataTimeHistogramMs(mockHistogram);
+        dorisWriteMetrics.setCommitAndPublishTimeHistogramMs(mockHistogram);
+        dorisWriteMetrics.setReadDataTimeHistogramMs(mockHistogram);
+        dorisWriteMetrics.setLoadTimeHistogramMs(mockHistogram);
+        return dorisWriteMetrics;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to