This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new dc1f0ec [feature](write)add metrics for flink load data (#287)
dc1f0ec is described below
commit dc1f0ec3a8462c59e121b6e00d0ef165ed5f1b46
Author: Petrichor <[email protected]>
AuthorDate: Fri Jan 5 16:33:42 2024 +0800
[feature](write)add metrics for flink load data (#287)
---
.../doris/flink/rest/models/RespContent.java | 70 ++++-
.../doris/flink/sink/writer/DorisStreamLoad.java | 6 +-
.../doris/flink/sink/writer/DorisWriteMetrics.java | 348 +++++++++++++++++++++
.../doris/flink/sink/writer/DorisWriter.java | 33 +-
.../doris/flink/sink/writer/TestDorisWriter.java | 37 +++
5 files changed, 475 insertions(+), 19 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
index 71edbfd..94a1dc4 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
@@ -26,7 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class RespContent {
@JsonProperty(value = "TxnId")
- private long txnId;
+ private Long txnId;
@JsonProperty(value = "Label")
private String label;
@@ -44,42 +44,42 @@ public class RespContent {
private String message;
@JsonProperty(value = "NumberTotalRows")
- private long numberTotalRows;
+ private Long numberTotalRows;
@JsonProperty(value = "NumberLoadedRows")
- private long numberLoadedRows;
+ private Long numberLoadedRows;
@JsonProperty(value = "NumberFilteredRows")
- private int numberFilteredRows;
+ private Integer numberFilteredRows;
@JsonProperty(value = "NumberUnselectedRows")
- private int numberUnselectedRows;
+ private Integer numberUnselectedRows;
@JsonProperty(value = "LoadBytes")
- private long loadBytes;
+ private Long loadBytes;
@JsonProperty(value = "LoadTimeMs")
- private int loadTimeMs;
+ private Integer loadTimeMs;
@JsonProperty(value = "BeginTxnTimeMs")
- private int beginTxnTimeMs;
+ private Integer beginTxnTimeMs;
@JsonProperty(value = "StreamLoadPutTimeMs")
- private int streamLoadPutTimeMs;
+ private Integer streamLoadPutTimeMs;
@JsonProperty(value = "ReadDataTimeMs")
- private int readDataTimeMs;
+ private Integer readDataTimeMs;
@JsonProperty(value = "WriteDataTimeMs")
- private int writeDataTimeMs;
+ private Integer writeDataTimeMs;
@JsonProperty(value = "CommitAndPublishTimeMs")
- private int commitAndPublishTimeMs;
+ private Integer commitAndPublishTimeMs;
@JsonProperty(value = "ErrorURL")
private String errorURL;
- public long getTxnId() {
+ public Long getTxnId() {
return txnId;
}
@@ -99,6 +99,50 @@ public class RespContent {
return existingJobStatus;
}
+ public Long getNumberTotalRows() {
+ return numberTotalRows;
+ }
+
+ public Long getNumberLoadedRows() {
+ return numberLoadedRows;
+ }
+
+ public Integer getNumberFilteredRows() {
+ return numberFilteredRows;
+ }
+
+ public Integer getNumberUnselectedRows() {
+ return numberUnselectedRows;
+ }
+
+ public Long getLoadBytes() {
+ return loadBytes;
+ }
+
+ public Integer getLoadTimeMs() {
+ return loadTimeMs;
+ }
+
+ public Integer getBeginTxnTimeMs() {
+ return beginTxnTimeMs;
+ }
+
+ public Integer getStreamLoadPutTimeMs() {
+ return streamLoadPutTimeMs;
+ }
+
+ public Integer getReadDataTimeMs() {
+ return readDataTimeMs;
+ }
+
+ public Integer getWriteDataTimeMs() {
+ return writeDataTimeMs;
+ }
+
+ public Integer getCommitAndPublishTimeMs() {
+ return commitAndPublishTimeMs;
+ }
+
@Override
public String toString() {
ObjectMapper mapper = new ObjectMapper();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index b095eb9..604eb5c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -155,8 +155,8 @@ public class DorisStreamLoad implements Serializable {
/**
* try to discard pending transactions with labels beginning with
labelSuffix.
*
- * @param labelSuffix
- * @param chkID
+ * @param labelSuffix the suffix of the stream load.
+ * @param chkID checkpoint id of task.
* @throws Exception
*/
public void abortPreCommit(String labelSuffix, long chkID) throws
Exception {
@@ -260,7 +260,7 @@ public class DorisStreamLoad implements Serializable {
/**
* start write data for new checkpoint.
*
- * @param label
+ * @param label the label of Stream Load.
* @throws IOException
*/
public void startLoad(String label, boolean isResume) throws IOException {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriteMetrics.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriteMetrics.java
new file mode 100644
index 0000000..d60d551
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriteMetrics.java
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
+
+import org.apache.doris.flink.rest.models.RespContent;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
+import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
+
+/**
+ * the metrics for Doris Writer.
+ *
+ * @since 1.6
+ */
+public class DorisWriteMetrics implements Serializable {
+ private static final long serialVersionUID = 1L;
+ // Window size of histogram metrics.
+ private static final int HISTOGRAM_WINDOW_SIZE = 100;
+ private static final List<String> DORIS_SUCCESS_STATUS =
+ new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
+ private final String tableIdentifier;
+ AtomicLong totalRows = new AtomicLong();
+ // Counter
+ private transient Counter totalFlushLoadBytes;
+ private transient Counter totalFlushNumberTotalRows;
+ private transient Counter totalFlushLoadedRows;
+ private transient Counter totalFlushTimeMs;
+ private transient Counter totalFlushSucceededTimes;
+ private transient Counter totalFlushFailedTimes;
+
+ private transient Counter totalFlushFilteredRows;
+ private transient Counter totalFlushUnselectedRows;
+ // Histogram
+ private transient Histogram beginTxnTimeHistogramMs;
+ private transient Histogram commitAndPublishTimeHistogramMs;
+ private transient Histogram streamLoadPutTimeHistogramMs;
+ private transient Histogram readDataTimeHistogramMs;
+ private transient Histogram writeDataTimeHistogramMs;
+ private transient Histogram loadTimeHistogramMs;
+
+ private static final String COUNTER_TOTAL_FLUSH_BYTES =
"totalFlushLoadBytes";
+ private static final String COUNTER_TOTAL_FLUSH_ROWS =
"flushTotalNumberRows";
+ private static final String COUNTER_TOTAL_FLUSH_LOADED_ROWS =
"totalFlushLoadedRows";
+ private static final String COUNTER_TOTAL_FLUSH_COST_TIME =
"totalFlushTimeMs";
+ private static final String COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES_COUNT =
+ "totalFlushSucceededNumber";
+ private static final String COUNTER_TOTAL_FLUSH_FAILED_TIMES_COUNT =
"totalFlushFailedNumber";
+
+ private static final String COUNTER_TOTAL_FILTERED_ROWS =
"totalFlushFilteredRows";
+ private static final String COUNTER_TOTAL_UNSELECTED_ROWS =
"totalFlushUnselectedRows";
+
+ private static final String HISTOGRAM_BEGIN_TXN_TIME_MS = "beginTxnTimeMs";
+ private static final String HISTOGRAM_STREAM_LOAD_PUT_DATA_TIME_MS =
"putDataTimeMs";
+ private static final String HISTOGRAM_READ_DATA_TIME_MS = "readDataTimeMs";
+ private static final String HISTOGRAM_WRITE_DATA_TIME_MS =
"writeDataTimeMs";
+ private static final String HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS =
"commitAndPublishTimeMs";
+ private static final String HISTOGRAM_LOAD_TIME_MS = "loadTimeMs";
+
+ private static final String METRIC_NAME_FORMAT = "%s_%s";
+
+ @VisibleForTesting
+ DorisWriteMetrics(SinkWriterMetricGroup sinkMetricGroup, String
tableIdentifier) {
+ this.tableIdentifier = tableIdentifier;
+ register(sinkMetricGroup);
+ }
+
+ public static DorisWriteMetrics of(
+ SinkWriterMetricGroup sinkWriterMetricGroup, String
tableIdentifier) {
+ return new DorisWriteMetrics(sinkWriterMetricGroup, tableIdentifier);
+ }
+
+ public void flush(RespContent respContent) {
+ if (respContent != null &&
DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+ flushSuccessLoad(respContent);
+ } else {
+ flushFailedRecord();
+ }
+ }
+
+ @VisibleForTesting
+ public void register(SinkWriterMetricGroup sinkMetricGroup) {
+ totalFlushNumberTotalRows =
+ sinkMetricGroup.counter(
+ String.format(
+ METRIC_NAME_FORMAT, tableIdentifier,
COUNTER_TOTAL_FLUSH_ROWS));
+ totalFlushLoadedRows =
+ sinkMetricGroup.counter(
+ String.format(
+ METRIC_NAME_FORMAT,
+ tableIdentifier,
+ COUNTER_TOTAL_FLUSH_LOADED_ROWS));
+ totalFlushLoadBytes =
+ sinkMetricGroup.counter(
+ String.format(
+ METRIC_NAME_FORMAT, tableIdentifier,
COUNTER_TOTAL_FLUSH_BYTES));
+ totalFlushFilteredRows =
+ sinkMetricGroup.counter(
+ String.format(
+ METRIC_NAME_FORMAT, tableIdentifier,
COUNTER_TOTAL_FILTERED_ROWS));
+ totalFlushUnselectedRows =
+ sinkMetricGroup.counter(
+ String.format(
+ METRIC_NAME_FORMAT,
+ tableIdentifier,
+ COUNTER_TOTAL_UNSELECTED_ROWS));
+ totalFlushSucceededTimes =
+ sinkMetricGroup.counter(
+ String.format(
+ METRIC_NAME_FORMAT,
+ tableIdentifier,
+ COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES_COUNT));
+ totalFlushFailedTimes =
+ sinkMetricGroup.counter(
+ String.format(
+ METRIC_NAME_FORMAT,
+ tableIdentifier,
+ COUNTER_TOTAL_FLUSH_FAILED_TIMES_COUNT));
+ totalFlushTimeMs =
+ sinkMetricGroup.counter(
+ String.format(
+ METRIC_NAME_FORMAT,
+ tableIdentifier,
+ COUNTER_TOTAL_FLUSH_COST_TIME));
+
+ loadTimeHistogramMs =
+ sinkMetricGroup.histogram(
+ String.format(METRIC_NAME_FORMAT, tableIdentifier,
HISTOGRAM_LOAD_TIME_MS),
+ new
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+ streamLoadPutTimeHistogramMs =
+ sinkMetricGroup.histogram(
+ String.format(
+ METRIC_NAME_FORMAT,
+ tableIdentifier,
+ HISTOGRAM_STREAM_LOAD_PUT_DATA_TIME_MS),
+ new
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+ commitAndPublishTimeHistogramMs =
+ sinkMetricGroup.histogram(
+ String.format(
+ METRIC_NAME_FORMAT,
+ tableIdentifier,
+ HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS),
+ new
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+ this.beginTxnTimeHistogramMs =
+ sinkMetricGroup.histogram(
+ String.format(
+ METRIC_NAME_FORMAT, tableIdentifier,
HISTOGRAM_BEGIN_TXN_TIME_MS),
+ new
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+ readDataTimeHistogramMs =
+ sinkMetricGroup.histogram(
+ String.format(
+ METRIC_NAME_FORMAT, tableIdentifier,
HISTOGRAM_READ_DATA_TIME_MS),
+ new
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+ writeDataTimeHistogramMs =
+ sinkMetricGroup.histogram(
+ String.format(
+ METRIC_NAME_FORMAT, tableIdentifier,
HISTOGRAM_WRITE_DATA_TIME_MS),
+ new
DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
+ }
+
+ private void flushSuccessLoad(RespContent responseContent) {
+
Optional.ofNullable(responseContent.getLoadBytes()).ifPresent(totalFlushLoadBytes::inc);
+ Optional.ofNullable(responseContent.getNumberLoadedRows())
+ .ifPresent(totalFlushLoadedRows::inc);
+ Optional.ofNullable(responseContent.getNumberTotalRows())
+ .ifPresent(totalFlushNumberTotalRows::inc);
+ Optional.ofNullable(responseContent.getNumberFilteredRows())
+ .ifPresent(totalFlushFilteredRows::inc);
+
Optional.ofNullable(responseContent.getLoadTimeMs()).ifPresent(totalFlushTimeMs::inc);
+ Optional.ofNullable(responseContent.getNumberUnselectedRows())
+ .ifPresent(totalFlushUnselectedRows::inc);
+
+ totalFlushSucceededTimes.inc();
+
+ Optional.ofNullable(responseContent.getCommitAndPublishTimeMs())
+ .ifPresent(commitAndPublishTimeHistogramMs::update);
+ Optional.ofNullable(responseContent.getWriteDataTimeMs())
+ .ifPresent(writeDataTimeHistogramMs::update);
+ Optional.ofNullable(responseContent.getBeginTxnTimeMs())
+ .ifPresent(beginTxnTimeHistogramMs::update);
+ Optional.ofNullable(responseContent.getReadDataTimeMs())
+ .ifPresent(readDataTimeHistogramMs::update);
+ Optional.ofNullable(responseContent.getStreamLoadPutTimeMs())
+ .ifPresent(streamLoadPutTimeHistogramMs::update);
+
Optional.ofNullable(responseContent.getLoadTimeMs()).ifPresent(loadTimeHistogramMs::update);
+ }
+
+ private void flushFailedRecord() {
+ totalFlushFailedTimes.inc();
+ }
+
+ public String getTableIdentifier() {
+ return tableIdentifier;
+ }
+
+ public Counter getTotalFlushLoadBytes() {
+ return totalFlushLoadBytes;
+ }
+
+ public Counter getTotalFlushNumberTotalRows() {
+ return totalFlushNumberTotalRows;
+ }
+
+ public Counter getTotalFlushLoadedRows() {
+ return totalFlushLoadedRows;
+ }
+
+ public Counter getTotalFlushTimeMs() {
+ return totalFlushTimeMs;
+ }
+
+ public Counter getTotalFlushSucceededTimes() {
+ return totalFlushSucceededTimes;
+ }
+
+ public Counter getTotalFlushFailedTimes() {
+ return totalFlushFailedTimes;
+ }
+
+ public Counter getTotalFlushFilteredRows() {
+ return totalFlushFilteredRows;
+ }
+
+ public Counter getTotalFlushUnselectedRows() {
+ return totalFlushUnselectedRows;
+ }
+
+ public Histogram getBeginTxnTimeHistogramMs() {
+ return beginTxnTimeHistogramMs;
+ }
+
+ public Histogram getCommitAndPublishTimeHistogramMs() {
+ return commitAndPublishTimeHistogramMs;
+ }
+
+ public Histogram getStreamLoadPutTimeHistogramMs() {
+ return streamLoadPutTimeHistogramMs;
+ }
+
+ public Histogram getReadDataTimeHistogramMs() {
+ return readDataTimeHistogramMs;
+ }
+
+ public Histogram getWriteDataTimeHistogramMs() {
+ return writeDataTimeHistogramMs;
+ }
+
+ public Histogram getLoadTimeHistogramMs() {
+ return loadTimeHistogramMs;
+ }
+
+ @VisibleForTesting
+ public void setTotalFlushLoadBytes(Counter totalFlushLoadBytes) {
+ this.totalFlushLoadBytes = totalFlushLoadBytes;
+ }
+
+ @VisibleForTesting
+ public void setTotalFlushNumberTotalRows(Counter
totalFlushNumberTotalRows) {
+ this.totalFlushNumberTotalRows = totalFlushNumberTotalRows;
+ }
+
+ @VisibleForTesting
+ public void setTotalFlushLoadedRows(Counter totalFlushLoadedRows) {
+ this.totalFlushLoadedRows = totalFlushLoadedRows;
+ }
+
+ @VisibleForTesting
+ public void setTotalFlushTimeMs(Counter totalFlushTimeMs) {
+ this.totalFlushTimeMs = totalFlushTimeMs;
+ }
+
+ @VisibleForTesting
+ public void setTotalFlushSucceededTimes(Counter totalFlushSucceededTimes) {
+ this.totalFlushSucceededTimes = totalFlushSucceededTimes;
+ }
+
+ @VisibleForTesting
+ public void setTotalFlushFailedTimes(Counter totalFlushFailedTimes) {
+ this.totalFlushFailedTimes = totalFlushFailedTimes;
+ }
+
+ @VisibleForTesting
+ public void setTotalFlushFilteredRows(Counter totalFlushFilteredRows) {
+ this.totalFlushFilteredRows = totalFlushFilteredRows;
+ }
+
+ @VisibleForTesting
+ public void setTotalFlushUnselectedRows(Counter totalFlushUnselectedRows) {
+ this.totalFlushUnselectedRows = totalFlushUnselectedRows;
+ }
+
+ @VisibleForTesting
+ public void setBeginTxnTimeHistogramMs(Histogram beginTxnTimeHistogramMs) {
+ this.beginTxnTimeHistogramMs = beginTxnTimeHistogramMs;
+ }
+
+ @VisibleForTesting
+ public void setCommitAndPublishTimeHistogramMs(Histogram
commitAndPublishTimeHistogramMs) {
+ this.commitAndPublishTimeHistogramMs = commitAndPublishTimeHistogramMs;
+ }
+
+ @VisibleForTesting
+ public void setStreamLoadPutTimeHistogramMs(Histogram
streamLoadPutTimeHistogramMs) {
+ this.streamLoadPutTimeHistogramMs = streamLoadPutTimeHistogramMs;
+ }
+
+ @VisibleForTesting
+ public void setReadDataTimeHistogramMs(Histogram readDataTimeHistogramMs) {
+ this.readDataTimeHistogramMs = readDataTimeHistogramMs;
+ }
+
+ @VisibleForTesting
+ public void setWriteDataTimeHistogramMs(Histogram
writeDataTimeHistogramMs) {
+ this.writeDataTimeHistogramMs = writeDataTimeHistogramMs;
+ }
+
+ @VisibleForTesting
+ public void setLoadTimeHistogramMs(Histogram loadTimeHistogramMs) {
+ this.loadTimeHistogramMs = loadTimeHistogramMs;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index e8cc1ff..5b8b4fc 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -82,6 +83,8 @@ public class DorisWriter<IN>
private transient Thread executorThread;
private transient volatile Exception loadException = null;
private BackendUtil backendUtil;
+ private SinkWriterMetricGroup sinkMetricGroup;
+ private Map<String, DorisWriteMetrics> sinkMetricsMap = new
ConcurrentHashMap<>();
public DorisWriter(
Sink.InitContext initContext,
@@ -96,7 +99,7 @@ public class DorisWriter<IN>
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
this.curCheckpointId = lastCheckpointId + 1;
LOG.info("restore checkpointId {}", lastCheckpointId);
- LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
+ LOG.info("labelPrefix {}", executionOptions.getLabelPrefix());
this.labelPrefix = executionOptions.getLabelPrefix();
this.subtaskId = initContext.getSubtaskId();
this.scheduledExecutorService =
@@ -107,7 +110,7 @@ public class DorisWriter<IN>
this.executionOptions = executionOptions;
this.intervalTime = executionOptions.checkInterval();
this.globalLoading = false;
-
+ sinkMetricGroup = initContext.metricGroup();
initializeLoad(state);
serializer.initial();
}
@@ -197,10 +200,24 @@ public class DorisWriter<IN>
streamLoader.startLoad(currentLabel, false);
loadingMap.put(tableKey, true);
globalLoading = true;
+ registerMetrics(tableKey);
}
streamLoader.writeRecord(record.getRow());
}
+ @VisibleForTesting
+ public void setSinkMetricGroup(SinkWriterMetricGroup sinkMetricGroup) {
+ this.sinkMetricGroup = sinkMetricGroup;
+ }
+
+ public void registerMetrics(String tableKey) {
+ if (sinkMetricsMap.containsKey(tableKey)) {
+ return;
+ }
+ DorisWriteMetrics metrics = DorisWriteMetrics.of(sinkMetricGroup,
tableKey);
+ sinkMetricsMap.put(tableKey, metrics);
+ }
+
@Override
public Collection<DorisCommittable> prepareCommit() throws IOException,
InterruptedException {
// Verify whether data is written during a checkpoint
@@ -222,10 +239,15 @@ public class DorisWriter<IN>
LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
String currentLabel =
labelGenerator.generateTableLabel(curCheckpointId);
RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
+ // refresh metrics
+ if (sinkMetricsMap.containsKey(tableIdentifier)) {
+ DorisWriteMetrics dorisWriteMetrics =
sinkMetricsMap.get(tableIdentifier);
+ dorisWriteMetrics.flush(respContent);
+ }
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg =
String.format(
- "tabel {} stream load error: %s, see more in
%s",
+ "table %s stream load error: %s, see more in
%s",
tableIdentifier,
respContent.getMessage(),
respContent.getErrorURL());
@@ -363,6 +385,11 @@ public class DorisWriter<IN>
this.dorisStreamLoadMap = streamLoadMap;
}
+ @VisibleForTesting
+ public void setDorisMetricsMap(Map<String, DorisWriteMetrics> metricsMap) {
+ this.sinkMetricsMap = metricsMap;
+ }
+
@Override
public void close() throws Exception {
LOG.info("Close DorisWriter.");
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 667a3f1..e25bef6 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -18,6 +18,9 @@
package org.apache.doris.flink.sink.writer;
import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
@@ -65,6 +68,7 @@ public class TestDorisWriter {
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE,
true);
when(httpClient.execute(any())).thenReturn(preCommitResponse);
Map<String, DorisStreamLoad> dorisStreamLoadMap = new
ConcurrentHashMap<>();
+ Map<String, DorisWriteMetrics> dorisWriteMetricsMap = new
ConcurrentHashMap<>();
DorisStreamLoad dorisStreamLoad =
new DorisStreamLoad(
"local:8040",
@@ -75,7 +79,11 @@ public class TestDorisWriter {
dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(),
dorisStreamLoad);
dorisStreamLoad.startLoad("", false);
Sink.InitContext initContext = mock(Sink.InitContext.class);
+ SinkWriterMetricGroup sinkWriterMetricGroup =
mock(SinkWriterMetricGroup.class);
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
+ when(initContext.getSubtaskId()).thenReturn(1);
+ DorisWriteMetrics mockWriteMetrics =
getMockWriteMetrics(sinkWriterMetricGroup);
+ dorisWriteMetricsMap.put(dorisOptions.getTableIdentifier(),
mockWriteMetrics);
DorisWriter<String> dorisWriter =
new DorisWriter<String>(
initContext,
@@ -84,7 +92,9 @@ public class TestDorisWriter {
dorisOptions,
readOptions,
executionOptions);
+ dorisWriter.setSinkMetricGroup(sinkWriterMetricGroup);
dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap);
+ dorisWriter.setDorisMetricsMap(dorisWriteMetricsMap);
dorisWriter.write("doris,1", null);
Collection<DorisCommittable> committableList =
dorisWriter.prepareCommit();
Assert.assertEquals(1, committableList.size());
@@ -129,4 +139,31 @@ public class TestDorisWriter {
Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix());
Assert.assertTrue(!dorisWriter.isLoading());
}
+
+ public DorisWriteMetrics getMockWriteMetrics(SinkWriterMetricGroup
sinkWriterMetricGroup) {
+ DorisWriteMetrics dorisWriteMetrics =
+ new DorisWriteMetrics(sinkWriterMetricGroup,
dorisOptions.getTableIdentifier());
+ Counter mockCounter = mock(Counter.class);
+ Histogram mockHistogram = mock(Histogram.class);
+ when(mockCounter.getCount()).thenReturn(0L);
+ when(mockHistogram.getCount()).thenReturn(0L);
+ dorisWriteMetrics.setTotalFlushLoadBytes(mockCounter);
+ dorisWriteMetrics.setTotalFlushNumberTotalRows(mockCounter);
+ dorisWriteMetrics.setTotalFlushFilteredRows(mockCounter);
+ dorisWriteMetrics.setTotalFlushLoadedRows(mockCounter);
+ dorisWriteMetrics.setTotalFlushFailedTimes(mockCounter);
+ dorisWriteMetrics.setTotalFlushNumberTotalRows(mockCounter);
+ dorisWriteMetrics.setTotalFlushUnselectedRows(mockCounter);
+ dorisWriteMetrics.setTotalFlushSucceededTimes(mockCounter);
+ dorisWriteMetrics.setTotalFlushTimeMs(mockCounter);
+ dorisWriteMetrics.setBeginTxnTimeHistogramMs(mockHistogram);
+ dorisWriteMetrics.setCommitAndPublishTimeHistogramMs(mockHistogram);
+ dorisWriteMetrics.setWriteDataTimeHistogramMs(mockHistogram);
+ dorisWriteMetrics.setStreamLoadPutTimeHistogramMs(mockHistogram);
+ dorisWriteMetrics.setWriteDataTimeHistogramMs(mockHistogram);
+ dorisWriteMetrics.setCommitAndPublishTimeHistogramMs(mockHistogram);
+ dorisWriteMetrics.setReadDataTimeHistogramMs(mockHistogram);
+ dorisWriteMetrics.setLoadTimeHistogramMs(mockHistogram);
+ return dorisWriteMetrics;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]