JNSimba commented on code in PR #287: URL: https://github.com/apache/doris-flink-connector/pull/287#discussion_r1442530073
########## flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriteMetrics.java: ########## @@ -0,0 +1,383 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram; + +import org.apache.doris.flink.rest.models.RespContent; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; +import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; + +/** + * the metrics for Doris Writer. + * + * @since 1.6 + */ +public class DorisWriteMetrics implements Serializable { + private static final long serialVersionUID = 1L; + // Window size of histogram metrics. + private static final int HISTOGRAM_WINDOW_SIZE = 100; + private static final List<String> DORIS_SUCCESS_STATUS = + new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); + private final String tableIdentifier; + AtomicLong totalRows = new AtomicLong(); + // Counter + private transient Counter totalFlushLoadBytes; + private transient Counter totalFlushNumberTotalRows; + private transient Counter totalFlushLoadedRows; + private transient Counter totalFlushTimeMs; + private transient Counter totalFlushSucceededTimes; + private transient Counter totalFlushFailedTimes; + + private transient Counter totalFlushFilteredRows; + private transient Counter totalFlushUnselectedRows; + // Histogram + private transient Histogram beginTxnTimeHistogramMs; + private transient Histogram commitAndPublishTimeHistogramMs; + private transient Histogram streamLoadPutTimeHistogramMs; + private transient Histogram readDataTimeHistogramMs; + private transient Histogram writeDataTimeHistogramMs; + private transient Histogram loadTimeHistogramMs; + + private static final String COUNTER_TOTAL_FLUSH_BYTES = "totalFlushLoadBytes"; + private static final String COUNTER_TOTAL_FLUSH_ROWS = "flushTotalNumberRows"; + private static final String COUNTER_TOTAL_FLUSH_LOADED_ROWS = "totalFlushLoadedRows"; + private static final String COUNTER_TOTAL_FLUSH_COST_TIME = "totalFlushTimeMs"; + private static final String COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES_COUNT = + "totalFlushSucceededNumber"; + private static final String COUNTER_TOTAL_FLUSH_FAILED_TIMES_COUNT = "totalFlushFailedNumber"; + + private static final String COUNTER_TOTAL_FILTERED_ROWS = "totalFlushFilteredRows"; + private static final String COUNTER_TOTAL_UNSELECTED_ROWS = "totalFlushUnselectedRows"; + + private static final String HISTOGRAM_BEGIN_TXN_TIME_MS = "beginTxnTimeMs"; + private static final String HISTOGRAM_STREAM_LOAD_PUT_DATA_TIME_MS = "putDataTimeMs"; + private static final String HISTOGRAM_READ_DATA_TIME_MS = "readDataTimeMs"; + private static final String HISTOGRAM_WRITE_DATA_TIME_MS = "writeDataTimeMs"; + private static final String HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS = "commitAndPublishTimeMs"; + private static final String HISTOGRAM_LOAD_TIME_MS = "loadTimeMs"; + + private static final String METRIC_NAME_FORMAT = "%s_%s_%s"; + + @VisibleForTesting + DorisWriteMetrics( + SinkWriterMetricGroup sinkMetricGroup, String tableIdentifier, int subTaskId) { + this.tableIdentifier = tableIdentifier; + register(sinkMetricGroup, subTaskId); + } + + public static DorisWriteMetrics of( + SinkWriterMetricGroup sinkWriterMetricGroup, String tableIdentifier, int subTaskId) { + return new DorisWriteMetrics(sinkWriterMetricGroup, tableIdentifier, subTaskId); + } + + public void flush(RespContent respContent) { + if (respContent != null && DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { + flushSuccessLoad(respContent); + } else { + flushFailedRecord(); + } + } + + @VisibleForTesting + public void register( + SinkWriterMetricGroup sinkMetricGroup, int subTaskId, int histogramWindowSize) { + totalFlushNumberTotalRows = + sinkMetricGroup.counter( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + COUNTER_TOTAL_FLUSH_ROWS)); + totalFlushLoadedRows = + sinkMetricGroup.counter( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + COUNTER_TOTAL_FLUSH_LOADED_ROWS)); + totalFlushLoadBytes = + sinkMetricGroup.counter( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + COUNTER_TOTAL_FLUSH_BYTES)); + totalFlushFilteredRows = + sinkMetricGroup.counter( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + COUNTER_TOTAL_FILTERED_ROWS)); + totalFlushUnselectedRows = + sinkMetricGroup.counter( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + COUNTER_TOTAL_UNSELECTED_ROWS)); + totalFlushSucceededTimes = + sinkMetricGroup.counter( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES_COUNT)); + totalFlushFailedTimes = + sinkMetricGroup.counter( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + COUNTER_TOTAL_FLUSH_FAILED_TIMES_COUNT)); + totalFlushTimeMs = + sinkMetricGroup.counter( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + COUNTER_TOTAL_FLUSH_COST_TIME)); + + loadTimeHistogramMs = + sinkMetricGroup.histogram( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + HISTOGRAM_LOAD_TIME_MS), + new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE)); + streamLoadPutTimeHistogramMs = + sinkMetricGroup.histogram( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + HISTOGRAM_STREAM_LOAD_PUT_DATA_TIME_MS), + new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE)); + commitAndPublishTimeHistogramMs = + sinkMetricGroup.histogram( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS), + new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE)); + this.beginTxnTimeHistogramMs = + sinkMetricGroup.histogram( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + HISTOGRAM_BEGIN_TXN_TIME_MS), + new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE)); + readDataTimeHistogramMs = + sinkMetricGroup.histogram( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + HISTOGRAM_READ_DATA_TIME_MS), + new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE)); + writeDataTimeHistogramMs = + sinkMetricGroup.histogram( + String.format( + METRIC_NAME_FORMAT, + tableIdentifier, + subTaskId, + HISTOGRAM_WRITE_DATA_TIME_MS), + new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE)); + } + + private void register(SinkWriterMetricGroup sinkMetricGroup, int subTaskId) { + register(sinkMetricGroup, subTaskId, HISTOGRAM_WINDOW_SIZE); + } + + private void flushSuccessLoad(RespContent responseContent) { + Optional.ofNullable(responseContent.getLoadBytes()).ifPresent(totalFlushLoadBytes::inc); + Optional.ofNullable(responseContent.getNumberLoadedRows()) + .ifPresent(totalFlushLoadedRows::inc); + Optional.ofNullable(responseContent.getNumberTotalRows()) + .ifPresent(totalFlushNumberTotalRows::inc); + Optional.ofNullable(responseContent.getNumberFilteredRows()) + .ifPresent(totalFlushFilteredRows::inc); + Optional.ofNullable(responseContent.getLoadTimeMs()).ifPresent(totalFlushTimeMs::inc); Review Comment: Is getLoadTimeMs also a counter-type metric? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
