This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit eb00535bf0ac9fff42c58aebeb22b05b9b899829 Author: thesumery <[email protected]> AuthorDate: Mon Sep 19 19:25:03 2022 +0800 [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive (#5906) Co-authored-by: thesumery <[email protected]> --- .../inlong/sort/configuration/Constants.java | 22 +++- .../apache/inlong/sort/protocol/InlongMetric.java | 22 +--- .../org/apache/inlong/sort/base/Constants.java | 11 +- .../apache/inlong/sort/base/metric/MetricData.java | 35 ++++- .../inlong/sort/base/metric/MetricOption.java | 141 ++++++++++++++++----- .../inlong/sort/base/metric/SinkMetricData.java | 128 +++++++++++-------- .../inlong/sort/base/metric/SourceMetricData.java | 105 +++++++-------- .../sort/base/util/ValidateMetricOptionUtils.java | 39 ------ .../sort/elasticsearch/ElasticsearchSinkBase.java | 26 ++-- .../table/RowElasticsearchSinkFunction.java | 55 ++------ .../sort/filesystem/FileSystemTableSink.java | 2 - .../filesystem/stream/AbstractStreamingWriter.java | 25 ++-- .../sort/hbase/HBase2DynamicTableFactory.java | 2 - .../inlong/sort/hbase/sink/HBaseSinkFunction.java | 46 +++---- .../hive/filesystem/AbstractStreamingWriter.java | 25 ++-- .../iceberg/flink/FlinkDynamicTableFactory.java | 11 +- .../sort/iceberg/flink/IcebergTableSink.java | 2 +- .../inlong/sort/iceberg/flink/sink/FlinkSink.java | 2 +- .../iceberg/flink/sink/IcebergStreamWriter.java | 7 - .../inlong/sort/iceberg/IcebergTableSink.java | 2 +- .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 2 +- .../sort/iceberg/sink/IcebergStreamWriter.java | 25 ++-- .../jdbc/internal/JdbcBatchingOutputFormat.java | 76 +++-------- .../inlong/sort/kafka/FlinkKafkaConsumerBase.java | 52 +++----- .../inlong/sort/kafka/FlinkKafkaProducer.java | 48 ++----- .../table/DynamicKafkaDeserializationSchema.java | 8 +- .../sort/cdc/mongodb/DebeziumSourceFunction.java | 32 ++--- .../mongodb/table/MongoDBTableSourceFactory.java | 2 - .../sort/cdc/debezium/DebeziumSourceFunction.java | 34 ++--- .../inlong/sort/cdc/mysql/source/MySqlSource.java | 36 ++---- .../source/metrics/MySqlSourceReaderMetrics.java | 124 ++---------------- .../mysql/table/MySqlTableInlongSourceFactory.java | 2 - .../sort/cdc/oracle/DebeziumSourceFunction.java | 32 ++--- .../cdc/oracle/table/OracleTableSourceFactory.java | 2 - .../DebeziumSourceFunction.java | 43 ++----- .../cdc/postgres/table/PostgreSQLTableFactory.java | 2 - .../apache/inlong/sort/pulsar/table/Constants.java | 32 ----- .../table/DynamicPulsarDeserializationSchema.java | 41 ++---- .../pulsar/table/PulsarDynamicTableFactory.java | 2 +- .../table/UpsertPulsarDynamicTableFactory.java | 2 +- .../sqlserver/table/DebeziumSourceFunction.java | 56 ++------ .../inlong/sort/parser/impl/FlinkSqlParser.java | 25 ++-- 42 files changed, 515 insertions(+), 871 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java index b0979c62f..e4d6a3906 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java @@ -60,6 +60,12 @@ public class Constants { public static final String HIVE_SINK_ORC_PREFIX = HIVE_SINK_PREFIX + "orc."; + public static final String GROUP_ID = "groupId"; + + public static final String STREAM_ID = "streamId"; + + public static final String NODE_ID = "nodeId"; + // ------------------------------------------------------------------------ // Operator uid // ------------------------------------------------------------------------ @@ -275,10 +281,18 @@ public class Constants { .defaultValue(5) .withDescription("minutes"); - public static final ConfigOption<String> METRICS_AUDIT_PROXY_HOSTS = key("metrics.audit.proxy.hosts") - .noDefaultValue() - .withDescription("Audit proxy host address for reporting audit metrics. " - + "e.g. 127.0.0.1:10081,0.0.0.1:10081"); + public static final ConfigOption<String> METRICS_LABELS = + ConfigOptions.key("inlong.metric.labels") + .noDefaultValue() + .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2'," + + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'"); + + + public static final ConfigOption<String> METRICS_AUDIT_PROXY_HOSTS = + ConfigOptions.key("metrics.audit.proxy.hosts") + .noDefaultValue() + .withDescription("Audit proxy host address for reporting audit metrics. \n" + + "e.g. 127.0.0.1:10081,0.0.0.1:10081"); // ------------------------------------------------------------------------ // Single tenant related diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java index 4afb46a89..b90a38ccb 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java @@ -19,29 +19,11 @@ package org.apache.inlong.sort.protocol; /** * The class is the abstract of Inlong Metric. - * We agree that the key of the inlong metric report is `inlong.metric`, + * We agree that the key of the inlong metric report is + * {@link org.apache.inlong.sort.configuration.Constants#METRICS_GROUP_STREAM_NODE}, * and its value is format by `groupId&streamId&nodeId`. * If node implements this interface, we will inject the key and value into the corresponding Sort-Connectors * during flink sql parser */ public interface InlongMetric { - - /** - * The key of metric, it must be `inlong.metric` here. - */ - String METRIC_KEY = "inlong.metric"; - - /** - * The value format, it must be `groupId&streamId&nodeId` here. - * The groupId is the id of Inlong Group - * The streamId is the id of Inlong Stream - * The nodeId is the id of Inlong Source or Sink - */ - String METRIC_VALUE_FORMAT = "%s&%s&%s"; - - /** - * The key of InLong audit, the value should be ip:port&ip:port - */ - String AUDIT_KEY = "inlong.audit"; - } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 18ff408f2..679bcc6c3 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -86,17 +86,18 @@ public final class Constants { public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states"; public static final ConfigOption<String> INLONG_METRIC = - ConfigOptions.key("inlong.metric") + ConfigOptions.key("inlong.metric.labels") .stringType() .noDefaultValue() - .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID"); - + .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2'," + + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'"); public static final ConfigOption<String> INLONG_AUDIT = - ConfigOptions.key("inlong.audit") + ConfigOptions.key("metrics.audit.proxy.hosts") .stringType() .noDefaultValue() - .withDescription("INLONG AUDIT HOST + '&' + PORT"); + .withDescription("Audit proxy host address for reporting audit metrics. \n" + + "e.g. 127.0.0.1:10081,0.0.0.1:10081"); public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG = ConfigOptions.key("sink.ignore.changelog") diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java index 681318ff7..381fa5ac4 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java @@ -23,6 +23,8 @@ import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; +import java.util.Map; + import static org.apache.inlong.sort.base.Constants.GROUP_ID; import static org.apache.inlong.sort.base.Constants.NODE_ID; import static org.apache.inlong.sort.base.Constants.STREAM_ID; @@ -40,26 +42,39 @@ public interface MetricData { */ MetricGroup getMetricGroup(); + /** + * Get labels + * + * @return The labels defined in inlong + */ + Map<String, String> getLabels(); + /** * Get group id * * @return The group id defined in inlong */ - String getGroupId(); + default String getGroupId() { + return getLabels().get(GROUP_ID); + } /** * Get stream id * * @return The stream id defined in inlong */ - String getStreamId(); + default String getStreamId() { + return getLabels().get(STREAM_ID); + } /** * Get node id * * @return The node id defined in inlong */ - String getNodeId(); + default String getNodeId() { + return getLabels().get(NODE_ID); + } /** * Register a counter metric @@ -69,8 +84,11 @@ public interface MetricData { * @return Counter of registered */ default Counter registerCounter(String metricName, Counter counter) { - return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId()) - .addGroup(NODE_ID, getNodeId()).counter(metricName, counter); + MetricGroup inlongMetricGroup = getMetricGroup(); + for (Map.Entry<String, String> label : getLabels().entrySet()) { + inlongMetricGroup = inlongMetricGroup.addGroup(label.getKey(), label.getValue()); + } + return inlongMetricGroup.counter(metricName, counter); } /** @@ -90,8 +108,11 @@ public interface MetricData { * @return Meter of registered */ default Meter registerMeter(String metricName, Counter counter) { - return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId()) - .addGroup(NODE_ID, getNodeId()).meter(metricName, new MeterView(counter, TIME_SPAN_IN_SECONDS)); + MetricGroup inlongMetricGroup = getMetricGroup(); + for (Map.Entry<String, String> label : getLabels().entrySet()) { + inlongMetricGroup = inlongMetricGroup.addGroup(label.getKey(), label.getValue()); + } + return inlongMetricGroup.meter(metricName, new MeterView(counter, TIME_SPAN_IN_SECONDS)); } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index d2179ae54..f4c679f9c 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -19,13 +19,19 @@ package org.apache.inlong.sort.base.metric; import org.apache.flink.util.Preconditions; -import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils; +import org.apache.flink.util.StringUtils; import javax.annotation.Nullable; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; import java.util.regex.Pattern; +import java.util.stream.Stream; import static org.apache.inlong.sort.base.Constants.DELIMITER; +import static org.apache.inlong.sort.base.Constants.GROUP_ID; +import static org.apache.inlong.sort.base.Constants.STREAM_ID; public class MetricOption { private static final String IP_OR_HOST_PORT = "^(.*):([0-9]|[1-9]\\d|[1-9]\\d{" @@ -35,55 +41,126 @@ public class MetricOption { + "3}|65[0-4]\\d{" + "2}|655[0-2]\\d|6553[0-5])$"; - private final String groupId; - private final String streamId; - private final String nodeId; + private Map<String, String> labels; private final HashSet<String> ipPortList; - private String ipPorts; + private Optional<String> ipPorts; + private RegisteredMetric registeredMetric; + private long initRecords; + private long initBytes; - public MetricOption(String inLongMetric) { - this(inLongMetric, null); - } + private MetricOption( + String inlongLabels, + @Nullable String inlongAudit, + RegisteredMetric registeredMetric, + long initRecords, + long initBytes) { + Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels), + "Inlong labels must be set for register metric."); - public MetricOption(String inLongMetric, @Nullable String inLongAudit) { - ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inLongMetric, inLongAudit); - String[] inLongMetricArray = inLongMetric.split(DELIMITER); - Preconditions.checkArgument(inLongMetricArray.length == 3, - "Error inLong metric format: " + inLongMetric); - this.groupId = inLongMetricArray[0]; - this.streamId = inLongMetricArray[1]; - this.nodeId = inLongMetricArray[2]; - this.ipPortList = new HashSet<>(); - this.ipPorts = null; + this.initRecords = initRecords; + this.initBytes = initBytes; + this.labels = new LinkedHashMap<>(); + String[] inLongLabelArray = inlongLabels.split(DELIMITER); + Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")), + "InLong metric label format must be xxx=xxx"); + Stream.of(inLongLabelArray).forEach(label -> { + String key = label.substring(0, label.indexOf('=')); + String value = label.substring(label.indexOf('=') + 1); + labels.put(key, value); + }); - if (inLongAudit != null) { - String[] ipPortStrs = inLongAudit.split(DELIMITER); - this.ipPorts = inLongAudit; + this.ipPortList = new HashSet<>(); + this.ipPorts = Optional.ofNullable(inlongAudit); + if (ipPorts.isPresent()) { + Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID), + "groupId and streamId must be set when enable inlong audit collect."); + String[] ipPortStrs = inlongAudit.split(DELIMITER); for (String ipPort : ipPortStrs) { Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT, ipPort), - "Error inLong audit format: " + inLongAudit); + "Error inLong audit format: " + inlongAudit); this.ipPortList.add(ipPort); } } - } - public String getGroupId() { - return groupId; - } - - public String getStreamId() { - return streamId; + if (registeredMetric != null) { + this.registeredMetric = registeredMetric; + } } - public String getNodeId() { - return nodeId; + public Map<String, String> getLabels() { + return labels; } public HashSet<String> getIpPortList() { return ipPortList; } - public String getIpPorts() { + public Optional<String> getIpPorts() { return ipPorts; } + + public RegisteredMetric getRegisteredMetric() { + return registeredMetric; + } + + public long getInitRecords() { + return initRecords; + } + + public long getInitBytes() { + return initBytes; + } + + public static Builder builder() { + return new Builder(); + } + + public enum RegisteredMetric { + ALL, + NORMAL, + DIRTY + } + + public static class Builder { + private String inlongLabels; + private String inlongAudit; + private RegisteredMetric registeredMetric = RegisteredMetric.ALL; + private long initRecords = 0L; + private long initBytes = 0L; + + private Builder() { + } + + public MetricOption.Builder withInlongLabels(String inlongLabels) { + this.inlongLabels = inlongLabels; + return this; + } + + public MetricOption.Builder withInlongAudit(String inlongAudit) { + this.inlongAudit = inlongAudit; + return this; + } + + public MetricOption.Builder withRegisterMetric(RegisteredMetric registeredMetric) { + this.registeredMetric = registeredMetric; + return this; + } + + public MetricOption.Builder withInitRecords(long initRecords) { + this.initRecords = initRecords; + return this; + } + + public MetricOption.Builder withInitBytes(long initBytes) { + this.initBytes = initBytes; + return this; + } + + public MetricOption build() { + if (inlongLabels == null && inlongAudit == null) { + return null; + } + return new MetricOption(inlongLabels, inlongAudit, registeredMetric, initRecords, initBytes); + } + } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 4073ddd44..d065496e4 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -25,12 +25,9 @@ import org.apache.flink.metrics.SimpleCounter; import org.apache.inlong.audit.AuditImp; import org.apache.inlong.sort.base.Constants; -import javax.annotation.Nullable; import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashSet; +import java.util.Map; -import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES; import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; @@ -45,10 +42,8 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND; */ public class SinkMetricData implements MetricData { - private final MetricGroup metricGroup; - private final String groupId; - private final String streamId; - private final String nodeId; + private MetricGroup metricGroup; + private Map<String, String> labels; private AuditImp auditImp; private Counter numRecordsOut; private Counter numBytesOut; @@ -59,23 +54,46 @@ public class SinkMetricData implements MetricData { private Meter numRecordsOutPerSecond; private Meter numBytesOutPerSecond; - public SinkMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) { - this(groupId, streamId, nodeId, metricGroup, null); - } - public SinkMetricData(MetricOption option, MetricGroup metricGroup) { - this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts()); - } - - public SinkMetricData( - String groupId, String streamId, String nodeId, MetricGroup metricGroup, - @Nullable String auditHostAndPorts) { this.metricGroup = metricGroup; - this.groupId = groupId; - this.streamId = streamId; - this.nodeId = nodeId; - if (auditHostAndPorts != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER)))); + this.labels = option.getLabels(); + + ThreadSafeCounter recordsOutCounter = new ThreadSafeCounter(); + ThreadSafeCounter bytesOutCounter = new ThreadSafeCounter(); + switch (option.getRegisteredMetric()) { + case DIRTY: + registerMetricsForDirtyBytes(new ThreadSafeCounter()); + registerMetricsForDirtyRecords(new ThreadSafeCounter()); + break; + case NORMAL: + registerMetricsForNumBytesOut(new ThreadSafeCounter()); + registerMetricsForNumRecordsOut(new ThreadSafeCounter()); + registerMetricsForNumBytesOutPerSecond(); + registerMetricsForNumRecordsOutPerSecond(); + + recordsOutCounter.inc(option.getInitRecords()); + bytesOutCounter.inc(option.getInitBytes()); + registerMetricsForNumRecordsOutForMeter(recordsOutCounter); + registerMetricsForNumRecordsOutForMeter(bytesOutCounter); + break; + default: + registerMetricsForDirtyBytes(new ThreadSafeCounter()); + registerMetricsForDirtyRecords(new ThreadSafeCounter()); + registerMetricsForNumBytesOut(new ThreadSafeCounter()); + registerMetricsForNumRecordsOut(new ThreadSafeCounter()); + registerMetricsForNumBytesOutPerSecond(); + registerMetricsForNumRecordsOutPerSecond(); + + recordsOutCounter.inc(option.getInitRecords()); + bytesOutCounter.inc(option.getInitBytes()); + registerMetricsForNumRecordsOutForMeter(recordsOutCounter); + registerMetricsForNumRecordsOutForMeter(bytesOutCounter); + break; + + } + + if (option.getIpPorts().isPresent()) { + AuditImp.getInstance().setAuditProxy(option.getIpPortList()); this.auditImp = AuditImp.getInstance(); } } @@ -218,18 +236,8 @@ public class SinkMetricData implements MetricData { } @Override - public String getGroupId() { - return groupId; - } - - @Override - public String getStreamId() { - return streamId; - } - - @Override - public String getNodeId() { - return nodeId; + public Map<String, String> getLabels() { + return labels; } public Counter getNumRecordsOutForMeter() { @@ -242,26 +250,26 @@ public class SinkMetricData implements MetricData { public void invokeWithEstimate(Object o) { long size = o.toString().getBytes(StandardCharsets.UTF_8).length; - this.numRecordsOut.inc(); - this.numBytesOut.inc(size); - this.numRecordsOutForMeter.inc(); - this.numBytesOutForMeter.inc(size); - if (auditImp != null) { - auditImp.add( - Constants.AUDIT_SORT_OUTPUT, - getGroupId(), - getStreamId(), - System.currentTimeMillis(), - 1, - size); - } + invoke(1, size); } public void invoke(long rowCount, long rowSize) { - this.numRecordsOut.inc(rowCount); - this.numBytesOut.inc(rowSize); - this.numRecordsOutForMeter.inc(rowCount); - this.numBytesOutForMeter.inc(rowSize); + if (numRecordsOut != null) { + numRecordsOut.inc(rowCount); + } + + if (numBytesOut != null) { + numBytesOut.inc(rowSize); + } + + if (numRecordsOutForMeter != null) { + numRecordsOutForMeter.inc(rowCount); + } + + if (numBytesOutForMeter != null) { + numBytesOutForMeter.inc(rowCount); + } + if (auditImp != null) { auditImp.add( Constants.AUDIT_SORT_OUTPUT, @@ -273,12 +281,22 @@ public class SinkMetricData implements MetricData { } } + public void invokeDirty(long rowCount, long rowSize) { + if (dirtyRecords != null) { + dirtyRecords.inc(rowCount); + } + + if (dirtyBytes != null) { + dirtyBytes.inc(rowSize); + } + } + @Override public String toString() { return "SinkMetricData{" - + "groupId='" + groupId + '\'' - + ", streamId='" + streamId + '\'' - + ", nodeId='" + nodeId + '\'' + + "metricGroup=" + metricGroup + + ", labels=" + labels + + ", auditImp=" + auditImp + ", numRecordsOut=" + numRecordsOut.getCount() + ", numBytesOut=" + numBytesOut.getCount() + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount() diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 5c25fcc75..3cffcfe54 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -25,12 +25,9 @@ import org.apache.flink.metrics.SimpleCounter; import org.apache.inlong.audit.AuditImp; import org.apache.inlong.sort.base.Constants; -import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; +import java.util.Map; -import java.util.Arrays; -import java.util.HashSet; - -import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND; @@ -43,46 +40,39 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND; */ public class SourceMetricData implements MetricData { - private final MetricGroup metricGroup; - private final String groupId; - private final String streamId; - private final String nodeId; + private MetricGroup metricGroup; + private Map<String, String> labels; private Counter numRecordsIn; private Counter numBytesIn; private Counter numRecordsInForMeter; private Counter numBytesInForMeter; private Meter numRecordsInPerSecond; private Meter numBytesInPerSecond; - private final AuditImp auditImp; - - public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) { - this(groupId, streamId, nodeId, metricGroup, (AuditImp) null); - } + private AuditImp auditImp; public SourceMetricData(MetricOption option, MetricGroup metricGroup) { - this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts()); - } - - public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup, - AuditImp auditImp) { - this.groupId = groupId; - this.streamId = streamId; - this.nodeId = nodeId; this.metricGroup = metricGroup; - this.auditImp = auditImp; - } + this.labels = option.getLabels(); + + SimpleCounter recordsInCounter = new SimpleCounter(); + SimpleCounter bytesInCounter = new SimpleCounter(); + switch (option.getRegisteredMetric()) { + default: + registerMetricsForNumRecordsIn(); + registerMetricsForNumBytesIn(); + registerMetricsForNumBytesInPerSecond(); + registerMetricsForNumRecordsInPerSecond(); + + recordsInCounter.inc(option.getInitRecords()); + bytesInCounter.inc(option.getInitBytes()); + registerMetricsForNumBytesInForMeter(recordsInCounter); + registerMetricsForNumRecordsInForMeter(bytesInCounter); + break; + } - public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup, - @Nullable String auditHostAndPorts) { - this.groupId = groupId; - this.streamId = streamId; - this.nodeId = nodeId; - this.metricGroup = metricGroup; - if (auditHostAndPorts != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER)))); + if (option.getIpPorts() != null) { + AuditImp.getInstance().setAuditProxy(option.getIpPortList()); this.auditImp = AuditImp.getInstance(); - } else { - this.auditImp = null; } } @@ -196,26 +186,32 @@ public class SourceMetricData implements MetricData { } @Override - public String getGroupId() { - return groupId; - } - - @Override - public String getStreamId() { - return streamId; + public Map<String, String> getLabels() { + return labels; } - @Override - public String getNodeId() { - return nodeId; + public void outputMetricsWithEstimate(Object o) { + long size = o.toString().getBytes(StandardCharsets.UTF_8).length; + outputMetrics(1, size); } public void outputMetrics(long rowCountSize, long rowDataSize) { - outputMetricForFlink(rowCountSize, rowDataSize); - outputMetricForAudit(rowCountSize, rowDataSize); - } + if (numRecordsIn != null) { + this.numRecordsIn.inc(rowCountSize); + } + + if (numBytesIn != null) { + this.numBytesIn.inc(rowDataSize); + } + + if (numRecordsInForMeter != null) { + this.numRecordsInForMeter.inc(rowCountSize); + } + + if (numBytesInForMeter != null) { + this.numBytesInForMeter.inc(rowDataSize); + } - public void outputMetricForAudit(long rowCountSize, long rowDataSize) { if (auditImp != null) { auditImp.add( Constants.AUDIT_SORT_INPUT, @@ -227,25 +223,18 @@ public class SourceMetricData implements MetricData { } } - public void outputMetricForFlink(long rowCountSize, long rowDataSize) { - this.numBytesIn.inc(rowDataSize); - this.numRecordsIn.inc(rowCountSize); - this.numBytesInForMeter.inc(rowDataSize); - this.numRecordsInForMeter.inc(rowCountSize); - } - @Override public String toString() { return "SourceMetricData{" - + "groupId='" + groupId + '\'' - + ", streamId='" + streamId + '\'' - + ", nodeId='" + nodeId + '\'' + + "metricGroup=" + metricGroup + + ", labels=" + labels + ", numRecordsIn=" + numRecordsIn.getCount() + ", numBytesIn=" + numBytesIn.getCount() + ", numRecordsInForMeter=" + numRecordsInForMeter.getCount() + ", numBytesInForMeter=" + numBytesInForMeter.getCount() + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate() + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate() + + ", auditImp=" + auditImp + '}'; } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java deleted file mode 100644 index bd58e31ae..000000000 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.base.util; - -import org.apache.flink.table.api.ValidationException; - -/** - * validate option tool - */ -public class ValidateMetricOptionUtils { - - /** - * validate inlong metric when set inlong audit - * @param inlongMetric inlong.metric option value - * @param inlongAudit inlong.audit option value - */ - public static void validateInlongMetricIfSetInlongAudit(String inlongMetric, String inlongAudit) { - if (inlongAudit != null && inlongMetric == null) { - throw new ValidationException("inlong metric is necessary when set inlong audit"); - } - } - -} diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java index 000e1c23a..cdea07fa5 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.util.InstantiationUtil; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SinkMetricData; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.DocWriteRequest; @@ -50,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.inlong.sort.base.Constants.DELIMITER; /** * Base class for all Flink Elasticsearch Sinks. @@ -266,15 +267,14 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends @Override public void open(Configuration parameters) throws Exception { client = callBridge.createClient(userConfig); - if (inlongMetric != null && !inlongMetric.isEmpty()) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - String groupId = inlongMetricArray[0]; - String streamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup()); - sinkMetricData.registerMetricsForDirtyBytes(); - sinkMetricData.registerMetricsForDirtyRecords(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withRegisterMetric(RegisteredMetric.DIRTY) + .build(); + if (metricOption != null) { + sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } + callBridge.verifyClientConnection(client); bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData)); requestIndexer = @@ -482,8 +482,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends if (failure != null) { restStatus = itemResponse.getFailure().getStatus(); actionRequest = request.requests().get(i); - if (sinkMetricData.getDirtyRecords() != null) { - sinkMetricData.getDirtyRecords().inc(); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(1, 0); } if (restStatus == null) { if (actionRequest instanceof ActionRequest) { @@ -526,8 +526,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends public void afterBulk(long executionId, BulkRequest request, Throwable failure) { try { for (DocWriteRequest writeRequest : request.requests()) { - if (sinkMetricData.getDirtyRecords() != null) { - sinkMetricData.getDirtyRecords().inc(); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(1, 0); } if (writeRequest instanceof ActionRequest) { failureHandler.onFailure( diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java index 0ae93231d..e4f1419c5 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -20,8 +20,8 @@ package org.apache.inlong.sort.elasticsearch.table; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.inlong.audit.AuditImp; -import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.table.api.TableException; @@ -37,13 +37,9 @@ import org.elasticsearch.common.xcontent.XContentType; import javax.annotation.Nullable; -import java.util.Arrays; -import java.util.HashSet; import java.util.Objects; import java.util.function.Function; -import static org.apache.inlong.sort.base.Constants.DELIMITER; - /** Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. */ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> { @@ -63,11 +59,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R private transient RuntimeContext runtimeContext; private SinkMetricData sinkMetricData; - private Long dataSize = 0L; - private Long rowSize = 0L; - private String groupId; - private String streamId; - private transient AuditImp auditImp; public RowElasticsearchSinkFunction( IndexGenerator indexGenerator, @@ -94,44 +85,20 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R public void open(RuntimeContext ctx) { indexGenerator.open(); this.runtimeContext = ctx; - if (inlongMetric != null && !inlongMetric.isEmpty()) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - groupId = inlongMetricArray[0]; - streamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup()); - sinkMetricData.registerMetricsForNumBytesOut(); - sinkMetricData.registerMetricsForNumRecordsOut(); - sinkMetricData.registerMetricsForNumBytesOutPerSecond(); - sinkMetricData.registerMetricsForNumRecordsOutPerSecond(); - } - - if (auditHostAndPorts != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); - } - } - - private void outputMetricForAudit(long size) { - if (auditImp != null) { - auditImp.add( - Constants.AUDIT_SORT_OUTPUT, - groupId, - streamId, - System.currentTimeMillis(), - 1, - size); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withRegisterMetric(RegisteredMetric.NORMAL) + .build(); + if (metricOption != null) { + sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup()); } } private void sendMetrics(byte[] document) { - if (sinkMetricData.getNumBytesOut() != null) { - sinkMetricData.getNumBytesOut().inc(document.length); - } - if (sinkMetricData.getNumRecordsOut() != null) { - sinkMetricData.getNumRecordsOut().inc(); + if (sinkMetricData != null) { + sinkMetricData.invoke(1, document.length); } - outputMetricForAudit(document.length); } @Override diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java index 60bcf9332..b18e7e32d 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java @@ -77,7 +77,6 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; -import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils; import org.apache.inlong.sort.filesystem.stream.StreamingSink; import javax.annotation.Nullable; @@ -157,7 +156,6 @@ public class FileSystemTableSink extends AbstractFileSystemTable this.configuredParallelism = tableOptions.get(FileSystemOptions.SINK_PARALLELISM); this.inlongMetric = tableOptions.get(INLONG_METRIC); this.inlongAudit = tableOptions.get(INLONG_AUDIT); - ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); } @Override diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java index 47516a270..95267c698 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java @@ -32,10 +32,9 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SinkMetricData; -import org.apache.inlong.sort.base.metric.ThreadSafeCounter; - -import static org.apache.inlong.sort.base.Constants.DELIMITER; /** * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send @@ -102,19 +101,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe @Override public void open() throws Exception { super.open(); - if (inlongMetric != null) { - String[] inLongMetricArray = inlongMetric.split(DELIMITER); - String groupId = inLongMetricArray[0]; - String streamId = inLongMetricArray[1]; - String nodeId = inLongMetricArray[2]; - metricData = new SinkMetricData( - groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(), inlongAudit); - metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter()); - metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter()); - metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOutPerSecond(); - metricData.registerMetricsForNumRecordsOutPerSecond(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withRegisterMetric(RegisteredMetric.ALL) + .withInlongAudit(inlongAudit) + .build(); + if (metricOption != null) { + metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } } diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java index da14c947a..55d5d57d7 100644 --- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java +++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java @@ -31,7 +31,6 @@ import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; import org.apache.hadoop.conf.Configuration; -import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils; import org.apache.inlong.sort.hbase.sink.HBaseDynamicTableSink; import java.util.HashSet; @@ -106,7 +105,6 @@ public class HBase2DynamicTableFactory HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema); String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); String inlongAudit = tableOptions.get(INLONG_AUDIT); - ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); return new HBaseDynamicTableSink( tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral, inlongMetric, inlongAudit); diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java index 907b758ff..a1e9641d2 100644 --- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java +++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java @@ -37,9 +37,9 @@ import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; -import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SinkMetricData; -import org.apache.inlong.sort.base.metric.ThreadSafeCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,19 +123,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration(); try { this.runtimeContext = getRuntimeContext(); - if (inlongMetric != null && !inlongMetric.isEmpty()) { - String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER); - String groupId = inlongMetricArray[0]; - String streamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup(), - inlongAudit); - sinkMetricData.registerMetricsForDirtyBytes(new ThreadSafeCounter()); - sinkMetricData.registerMetricsForDirtyRecords(new ThreadSafeCounter()); - sinkMetricData.registerMetricsForNumBytesOut(new ThreadSafeCounter()); - sinkMetricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter()); - sinkMetricData.registerMetricsForNumBytesOutPerSecond(); - sinkMetricData.registerMetricsForNumRecordsOutPerSecond(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(inlongAudit) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup()); } this.mutationConverter.open(); this.numPendingRequests = new AtomicLong(0); @@ -163,14 +157,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> } try { flush(); - sinkMetricData.invoke(rowSize, dataSize); + if (sinkMetricData != null) { + sinkMetricData.invoke(rowSize, dataSize); + } resetStateAfterFlush(); } catch (Exception e) { - if (sinkMetricData.getDirtyRecords() != null) { - sinkMetricData.getDirtyRecords().inc(rowSize); - } - if (sinkMetricData.getDirtyBytes() != null) { - sinkMetricData.getDirtyBytes().inc(dataSize); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(rowSize, dataSize); } resetStateAfterFlush(); // fail the sink and skip the rest of the items @@ -236,14 +229,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) { try { flush(); - sinkMetricData.invoke(rowSize, dataSize); + if (sinkMetricData != null) { + sinkMetricData.invoke(rowSize, dataSize); + } resetStateAfterFlush(); } catch (Exception e) { - if (sinkMetricData.getDirtyRecords() != null) { - sinkMetricData.getDirtyRecords().inc(rowSize); - } - if (sinkMetricData.getDirtyBytes() != null) { - sinkMetricData.getDirtyBytes().inc(dataSize); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(rowSize, dataSize); } resetStateAfterFlush(); throw e; diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java index 5d0ca7bf0..ab2e845f2 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java @@ -32,12 +32,11 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SinkMetricData; -import org.apache.inlong.sort.base.metric.ThreadSafeCounter; import javax.annotation.Nullable; -import static org.apache.inlong.sort.base.Constants.DELIMITER; - /** * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send * file and bucket information to downstream. @@ -111,19 +110,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe @Override public void open() throws Exception { super.open(); - if (inlongMetric != null) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - String inlongGroupId = inlongMetricArray[0]; - String inlongStreamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - metricData = new SinkMetricData( - inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts); - metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter()); - metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter()); - metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOutPerSecond(); - metricData.registerMetricsForNumRecordsOutPerSecond(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } } diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java index 35d3a6c76..7840e3ea7 100644 --- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java @@ -135,12 +135,11 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami Map<String, String> tableProps = catalogTable.getOptions(); TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()); SyncRewriteDataFilesActionOption compactOption = new SyncRewriteDataFilesActionOption(tableProps); - MetricOption metricOption = null; - if (tableProps.containsKey(INLONG_METRIC.key())) { - metricOption = new MetricOption( - tableProps.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()), - tableProps.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue())); - } + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(tableProps.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue())) + .withInlongAudit(tableProps.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue())) + .build(); + boolean appendMode = tableProps.containsKey(ICEBERG_IGNORE_ALL_CHANGELOG.key()) ? Boolean.parseBoolean(tableProps.get(ICEBERG_IGNORE_ALL_CHANGELOG.key())) : ICEBERG_IGNORE_ALL_CHANGELOG.defaultValue(); diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java index 112b6a36b..df0ced054 100644 --- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java +++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java @@ -43,7 +43,7 @@ import java.util.Map; * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey. * Add a table property `write.compact.enable` to support small file compact. - * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit + * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit */ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite { private static final Logger LOG = LoggerFactory.getLogger(IcebergTableSink.class); diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java index 0b48fb169..c24d5a8c6 100644 --- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java +++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java @@ -74,7 +74,7 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey. * Add a table property `write.compact.enable` to support small file compact. - * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit. + * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit. */ public class FlinkSink { private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class); diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java index 9f7cffbcf..bb00b7808 100644 --- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java @@ -30,7 +30,6 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.SinkMetricData; -import org.apache.inlong.sort.base.metric.ThreadSafeCounter; import javax.annotation.Nullable; import java.io.IOException; @@ -76,12 +75,6 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> // Initialize metric if (metricOption != null) { metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); - metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter()); - metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter()); - metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOutPerSecond(); - metricData.registerMetricsForNumRecordsOutPerSecond(); } } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java index 088622278..37eb2670b 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java @@ -46,7 +46,7 @@ import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_A /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey. - * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit + * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit */ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite { diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java index 65929bc34..8662722d8 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java @@ -71,7 +71,7 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey. - * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit. + * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit. */ public class FlinkSink { private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class); diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java index 79df9049f..8318c7177 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java @@ -28,14 +28,13 @@ import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SinkMetricData; -import org.apache.inlong.sort.base.metric.ThreadSafeCounter; import javax.annotation.Nullable; import java.io.IOException; -import static org.apache.inlong.sort.base.Constants.DELIMITER; - /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 */ @@ -79,19 +78,13 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> this.writer = taskWriterFactory.create(); // Initialize metric - if (inlongMetric != null) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - String inlongGroupId = inlongMetricArray[0]; - String inlongStreamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - metricData = new SinkMetricData( - inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts); - metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter()); - metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter()); - metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOutPerSecond(); - metricData.registerMetricsForNumRecordsOutPerSecond(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } } diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java index 66af78c4d..e32d4094b 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java @@ -33,7 +33,8 @@ import org.apache.flink.connector.jdbc.utils.JdbcUtils; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SinkMetricData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,9 +44,7 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.sql.SQLException; -import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -53,8 +52,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT; -import static org.apache.inlong.sort.base.Constants.DELIMITER; /** * A JDBC outputFormat that supports batching records before writing records to database. @@ -80,9 +77,6 @@ public class JdbcBatchingOutputFormat< private transient RuntimeContext runtimeContext; private SinkMetricData sinkMetricData; - private String inlongGroupId; - private String inlongStreamId; - private transient AuditImp auditImp; private Long dataSize = 0L; private Long rowSize = 0L; @@ -130,22 +124,13 @@ public class JdbcBatchingOutputFormat< public void open(int taskNumber, int numTasks) throws IOException { super.open(taskNumber, numTasks); this.runtimeContext = getRuntimeContext(); - if (inlongMetric != null && !inlongMetric.isEmpty()) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - inlongGroupId = inlongMetricArray[0]; - inlongStreamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - sinkMetricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, runtimeContext.getMetricGroup()); - sinkMetricData.registerMetricsForDirtyBytes(); - sinkMetricData.registerMetricsForDirtyRecords(); - sinkMetricData.registerMetricsForNumBytesOut(); - sinkMetricData.registerMetricsForNumRecordsOut(); - sinkMetricData.registerMetricsForNumBytesOutPerSecond(); - sinkMetricData.registerMetricsForNumRecordsOutPerSecond(); - } - if (auditHostAndPorts != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup()); } jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory); if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { @@ -159,20 +144,13 @@ public class JdbcBatchingOutputFormat< if (!closed) { try { flush(); - if (sinkMetricData.getNumRecordsOut() != null) { - sinkMetricData.getNumRecordsOut().inc(rowSize); - } - if (sinkMetricData.getNumBytesOut() != null) { - sinkMetricData.getNumBytesOut() - .inc(dataSize); + if (sinkMetricData != null) { + sinkMetricData.invoke(rowSize, dataSize); } resetStateAfterFlush(); } catch (Exception e) { - if (sinkMetricData.getDirtyRecords() != null) { - sinkMetricData.getDirtyRecords().inc(rowSize); - } - if (sinkMetricData.getDirtyBytes() != null) { - sinkMetricData.getDirtyBytes().inc(dataSize); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(rowSize, dataSize); } resetStateAfterFlush(); flushException = e; @@ -203,46 +181,26 @@ public class JdbcBatchingOutputFormat< } } - private void outputMetricForAudit(long length) { - if (auditImp != null) { - auditImp.add( - AUDIT_SORT_INPUT, - inlongGroupId, - inlongStreamId, - System.currentTimeMillis(), - 1, - length); - } - } - @Override public final synchronized void writeRecord(In record) throws IOException { checkFlushException(); rowSize++; dataSize = dataSize + record.toString().getBytes(StandardCharsets.UTF_8).length; - outputMetricForAudit(dataSize); try { addToBatch(record, jdbcRecordExtractor.apply(record)); batchCount++; if (executionOptions.getBatchSize() > 0 && batchCount >= executionOptions.getBatchSize()) { flush(); - if (sinkMetricData.getNumRecordsOut() != null) { - sinkMetricData.getNumRecordsOut().inc(rowSize); - } - if (sinkMetricData.getNumBytesOut() != null) { - sinkMetricData.getNumBytesOut() - .inc(dataSize); + if (sinkMetricData != null) { + sinkMetricData.invoke(rowSize, dataSize); } resetStateAfterFlush(); } } catch (Exception e) { - if (sinkMetricData.getDirtyRecords() != null) { - sinkMetricData.getDirtyRecords().inc(rowSize); - } - if (sinkMetricData.getDirtyBytes() != null) { - sinkMetricData.getDirtyBytes().inc(dataSize); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(rowSize, dataSize); } resetStateAfterFlush(); throw new IOException("Writing records to JDBC failed.", e); diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java index 0d0ab4544..b655a10cd 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java @@ -19,7 +19,6 @@ package org.apache.inlong.sort.kafka; import org.apache.commons.collections.map.LinkedMap; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; @@ -63,10 +62,10 @@ import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWat import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SourceMetricData; -import org.apache.inlong.sort.base.metric.ThreadSafeCounter; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -74,10 +73,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -90,7 +87,6 @@ import static org.apache.flink.streaming.connectors.kafka.internals.metrics.Kafk import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; @@ -831,36 +827,20 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti @Override public void run(SourceContext<T> sourceContext) throws Exception { - - if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - String groupId = inlongMetricArray[0]; - String streamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - AuditImp auditImp = null; - if (inlongAudit != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); - } - sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(), - auditImp); - ThreadSafeCounter recordsInCounter = new ThreadSafeCounter(); - ThreadSafeCounter bytesInCounter = new ThreadSafeCounter(); - if (metricState != null) { - recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN)); - bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN)); - } - sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter); - sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter); - sourceMetricData.registerMetricsForNumRecordsInForMeter(new ThreadSafeCounter()); - sourceMetricData.registerMetricsForNumBytesInForMeter(new ThreadSafeCounter()); - sourceMetricData.registerMetricsForNumBytesInPerSecond(); - sourceMetricData.registerMetricsForNumRecordsInPerSecond(); - if (this.deserializer instanceof DynamicKafkaDeserializationSchema) { - DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema = - (DynamicKafkaDeserializationSchema) deserializer; - dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData); - } + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(inlongAudit) + .withRegisterMetric(RegisteredMetric.ALL) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L) + .build(); + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } + if (this.deserializer instanceof DynamicKafkaDeserializationSchema) { + DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema = + (DynamicKafkaDeserializationSchema) deserializer; + dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData); } if (subscribedPartitionsToStartOffsets == null) { diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java index 3f0902c0c..d3ca629cb 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java @@ -57,10 +57,10 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TemporaryClassLoaderContext; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; -import org.apache.inlong.sort.base.metric.ThreadSafeCounter; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; @@ -97,7 +97,6 @@ import java.util.concurrent.atomic.AtomicLong; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; -import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; @@ -243,18 +242,6 @@ public class FlinkKafkaProducer<IN> */ @Nullable protected transient volatile Exception asyncException; - /** - * audit implement - */ - private transient AuditImp auditImp; - /** - * inLong groupId - */ - private String inlongGroupId; - /** - * inLong streamId - */ - private String inlongStreamId; /** * sink metric data */ @@ -914,25 +901,15 @@ public class FlinkKafkaProducer<IN> RuntimeContextInitializationContextAdapters.serializationAdapter( getRuntimeContext(), metricGroup -> metricGroup.addGroup("user"))); } - if (inlongMetric != null && !inlongMetric.isEmpty()) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - inlongGroupId = inlongMetricArray[0]; - inlongStreamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup(), - auditHostAndPorts); - metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter()); - metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter()); - metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter()); - metricData.registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter()); - metricData.registerMetricsForNumBytesOutPerSecond(); - metricData.registerMetricsForNumRecordsOutPerSecond(); - } - if (metricState != null && metricData != null) { - metricData.getNumBytesOut().inc(metricState.getMetricValue(NUM_BYTES_OUT)); - metricData.getNumRecordsOut().inc(metricState.getMetricValue(NUM_RECORDS_OUT)); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + metricData = new SinkMetricData(metricOption, ctx.getMetricGroup()); } super.open(configuration); } @@ -945,8 +922,7 @@ public class FlinkKafkaProducer<IN> private void sendDirtyMetrics(Long rowSize, Long dataSize) { if (metricData != null) { - metricData.getDirtyRecords().inc(rowSize); - metricData.getDirtyBytes().inc(dataSize); + metricData.invokeDirty(rowSize, dataSize); } } diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java index c6b5c11a9..13e3d2103 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java @@ -120,7 +120,9 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc if (keyDeserialization == null && !hasMetadata) { valueDeserialization.deserialize(record.value(), collector); // output metrics - outputMetrics(record); + if (metricData != null) { + metricData.outputMetrics(1, record.value().length); + } return; } @@ -140,7 +142,9 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc } else { valueDeserialization.deserialize(record.value(), outputCollector); // output metrics - outputMetrics(record); + if (metricData != null) { + metricData.outputMetrics(1, record.value().length); + } } keyCollector.buffer.clear(); diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java index 42bb53732..932e249d5 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java @@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.engine.spi.OffsetCommitPolicy; import io.debezium.heartbeat.Heartbeat; import org.apache.commons.collections.map.LinkedMap; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; @@ -58,7 +57,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -68,9 +68,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -81,7 +79,6 @@ import java.util.concurrent.TimeUnit; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; -import static org.apache.inlong.sort.base.Constants.DELIMITER; /** * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data @@ -416,21 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay()); metricGroup.gauge( "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); - if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - String groupId = inlongMetricArray[0]; - String streamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - AuditImp auditImp = null; - if (inlongAudit != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); - } - metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp); - metricData.registerMetricsForNumRecordsIn(); - metricData.registerMetricsForNumBytesIn(); - metricData.registerMetricsForNumBytesInPerSecond(); - metricData.registerMetricsForNumRecordsInPerSecond(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(inlongAudit) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + metricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup()); } properties.setProperty("name", "engine"); properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName()); @@ -470,8 +459,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { if (metricData != null) { - metricData.outputMetrics(1L, - record.value().toString().getBytes(StandardCharsets.UTF_8).length); + metricData.outputMetricsWithEstimate(record.value()); } deserializer.deserialize(record, out); } diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java index 919c227e4..20f20b65f 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java @@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils; import java.time.ZoneId; import java.util.HashSet; @@ -226,7 +225,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { : ZoneId.of(zoneId); final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); final String inlongAudit = config.get(INLONG_AUDIT); - ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present"); diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java index e7084d2fa..1ac48f97a 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java @@ -25,7 +25,6 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.engine.spi.OffsetCommitPolicy; import io.debezium.heartbeat.Heartbeat; import org.apache.commons.collections.map.LinkedMap; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; @@ -47,7 +46,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer; import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher; @@ -66,9 +66,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -77,7 +75,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; @@ -416,23 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay()); metricGroup.gauge( "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); - if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - String groupId = inlongMetricArray[0]; - String streamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - AuditImp auditImp = null; - if (inlongAudit != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); - } - sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp); - sourceMetricData.registerMetricsForNumRecordsIn(); - sourceMetricData.registerMetricsForNumBytesIn(); - sourceMetricData.registerMetricsForNumBytesInForMeter(); - sourceMetricData.registerMetricsForNumRecordsInForMeter(); - sourceMetricData.registerMetricsForNumBytesInPerSecond(); - sourceMetricData.registerMetricsForNumRecordsInPerSecond(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(inlongAudit) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup()); } properties.setProperty("name", "engine"); @@ -473,8 +460,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { if (sourceMetricData != null) { - sourceMetricData.outputMetrics(1L, - record.value().toString().getBytes(StandardCharsets.UTF_8).length); + sourceMetricData.outputMetricsWithEstimate(record.value()); } deserializer.deserialize(record, out); } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java index dcb510044..ea102f429 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java @@ -20,7 +20,6 @@ package org.apache.inlong.sort.cdc.mysql.source; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -36,8 +35,8 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.inlong.audit.AuditImp; -import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema; import org.apache.inlong.sort.cdc.mysql.MySqlValidator; import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils; @@ -62,12 +61,9 @@ import org.apache.inlong.sort.cdc.mysql.table.StartupMode; import org.apache.kafka.connect.source.SourceRecord; import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.function.Supplier; -import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.discoverCapturedTables; import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection; @@ -142,28 +138,18 @@ public class MySqlSource<T> final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext); final MySqlSourceReaderMetrics sourceReaderMetrics = new MySqlSourceReaderMetrics(metricGroup); - sourceReaderMetrics.registerMetrics(); - MySqlSourceReaderContext mySqlSourceReaderContext = - new MySqlSourceReaderContext(readerContext); // create source config for the given subtask (e.g. unique server id) MySqlSourceConfig sourceConfig = configFactory.createConfig(readerContext.getIndexOfSubtask()); - String inlongMetric = sourceConfig.getInlongMetric(); - String inlongAudit = sourceConfig.getInlongAudit(); - if (StringUtils.isNotEmpty(inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - sourceReaderMetrics.setInlongGroupId(inlongMetricArray[0]); - sourceReaderMetrics.setInlongSteamId(inlongMetricArray[1]); - sourceReaderMetrics.setNodeId(inlongMetricArray[2]); - if (inlongAudit != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER)))); - sourceReaderMetrics.setAuditImp(AuditImp.getInstance()); - } - sourceReaderMetrics.registerMetricsForNumBytesIn(Constants.NUM_BYTES_IN); - sourceReaderMetrics.registerMetricsForNumRecordsIn(Constants.NUM_RECORDS_IN); - sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(Constants.NUM_BYTES_IN_PER_SECOND); - sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(Constants.NUM_RECORDS_IN_PER_SECOND); - } + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(sourceConfig.getInlongMetric()) + .withInlongAudit(sourceConfig.getInlongAudit()) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + sourceReaderMetrics.registerMetrics(metricOption); + MySqlSourceReaderContext mySqlSourceReaderContext = + new MySqlSourceReaderContext(readerContext); + FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue = new FutureCompletingBlockingQueue<>(); Supplier<MySqlSplitReader> splitReaderSupplier = diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java index b301ed572..45c81e560 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -18,13 +18,10 @@ package org.apache.inlong.sort.cdc.mysql.source.metrics; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; -import org.apache.inlong.audit.AuditImp; -import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader; /** @@ -53,56 +50,21 @@ public class MySqlSourceReaderMetrics { */ private volatile long emitDelay = 0L; - private Counter numRecordsIn; - private Counter numBytesIn; - private Meter numRecordsInPerSecond; - private Meter numBytesInPerSecond; - private static Integer TIME_SPAN_IN_SECONDS = 60; - private static String STREAM_ID = "streamId"; - private static String GROUP_ID = "groupId"; - private static String NODE_ID = "nodeId"; - private String inlongGroupId; - private String inlongSteamId; - private String nodeId; - private AuditImp auditImp; + private SourceMetricData sourceMetricData; public MySqlSourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; } - public void registerMetrics() { + public void registerMetrics(MetricOption metricOption) { + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption, metricGroup); + } metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>) this::getFetchDelay); metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>) this::getEmitDelay); metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime); } - public void registerMetricsForNumRecordsIn(String metricName) { - numRecordsIn = - metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId) - .addGroup(NODE_ID, this.nodeId) - .counter(metricName); - } - - public void registerMetricsForNumBytesIn(String metricName) { - numBytesIn = - metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId) - .addGroup(NODE_ID, this.nodeId) - .counter(metricName); - } - - public void registerMetricsForNumRecordsInPerSecond(String metricName) { - numRecordsInPerSecond = - metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId) - .addGroup(NODE_ID, nodeId) - .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS)); - } - - public void registerMetricsForNumBytesInPerSecond(String metricName) { - numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId) - .addGroup(NODE_ID, this.nodeId) - .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS)); - } - public long getFetchDelay() { return fetchDelay; } @@ -131,77 +93,9 @@ public class MySqlSourceReaderMetrics { this.emitDelay = emitDelay; } - public Counter getNumRecordsIn() { - return numRecordsIn; - } - - public Counter getNumBytesIn() { - return numBytesIn; - } - - public Meter getNumRecordsInPerSecond() { - return numRecordsInPerSecond; - } - - public Meter getNumBytesInPerSecond() { - return numBytesInPerSecond; - } - - public String getInlongGroupId() { - return inlongGroupId; - } - - public String getInlongSteamId() { - return inlongSteamId; - } - - public String getNodeId() { - return nodeId; - } - - public void setInlongGroupId(String inlongGroupId) { - this.inlongGroupId = inlongGroupId; - } - - public void setInlongSteamId(String inlongSteamId) { - this.inlongSteamId = inlongSteamId; - } - - public void setNodeId(String nodeId) { - this.nodeId = nodeId; - } - - public AuditImp getAuditImp() { - return auditImp; - } - - public void setAuditImp(AuditImp auditImp) { - this.auditImp = auditImp; - } - public void outputMetrics(long rowCountSize, long rowDataSize) { - outputMetricForFlink(rowCountSize, rowDataSize); - outputMetricForAudit(rowCountSize, rowDataSize); - } - - public void outputMetricForAudit(long rowCountSize, long rowDataSize) { - if (this.auditImp != null) { - this.auditImp.add( - Constants.AUDIT_SORT_INPUT, - getInlongGroupId(), - getInlongSteamId(), - System.currentTimeMillis(), - rowCountSize, - rowDataSize); - } - } - - public void outputMetricForFlink(long rowCountSize, long rowDataSize) { - if (this.numBytesIn != null) { - numBytesIn.inc(rowDataSize); - } - if (this.numRecordsIn != null) { - this.numRecordsIn.inc(rowCountSize); + if (sourceMetricData != null) { + sourceMetricData.outputMetrics(rowCountSize, rowDataSize); } } } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java index c8a70b8bd..b4610d959 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java @@ -26,7 +26,6 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.util.Preconditions; -import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils; import org.apache.inlong.sort.cdc.debezium.table.DebeziumOptions; import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions; import org.apache.inlong.sort.cdc.mysql.source.config.ServerIdRange; @@ -124,7 +123,6 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory final ReadableConfig config = helper.getOptions(); final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); final String inlongAudit = config.get(INLONG_AUDIT); - ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); final String hostname = config.get(HOSTNAME); final String username = config.get(USERNAME); final String password = config.get(PASSWORD); diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java index 1458693fb..869a0a1cd 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java @@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.engine.spi.OffsetCommitPolicy; import io.debezium.heartbeat.Heartbeat; import org.apache.commons.collections.map.LinkedMap; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; @@ -58,7 +57,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -68,9 +68,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -81,7 +79,6 @@ import java.util.concurrent.TimeUnit; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; -import static org.apache.inlong.sort.base.Constants.DELIMITER; /** * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data @@ -416,21 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay()); metricGroup.gauge( "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); - if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - String groupId = inlongMetricArray[0]; - String streamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - AuditImp auditImp = null; - if (inlongAudit != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); - } - metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp); - metricData.registerMetricsForNumRecordsIn(); - metricData.registerMetricsForNumBytesIn(); - metricData.registerMetricsForNumBytesInPerSecond(); - metricData.registerMetricsForNumRecordsInPerSecond(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(inlongAudit) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + metricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup()); } properties.setProperty("name", "engine"); properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName()); @@ -470,8 +459,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { if (metricData != null) { - metricData.outputMetrics(1L, - record.value().toString().getBytes(StandardCharsets.UTF_8).length); + metricData.outputMetricsWithEstimate(record.value()); } deserializer.deserialize(record, out); } diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java index 0b4471ae3..d020d03f9 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java @@ -28,7 +28,6 @@ import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils; import java.util.HashSet; import java.util.Set; @@ -114,7 +113,6 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory { ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); String inlongAudit = config.get(INLONG_AUDIT); - ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); return new OracleTableSource( physicalSchema, port, diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java index 4ef40bcc8..2ccf92421 100644 --- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java @@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.engine.spi.OffsetCommitPolicy; import io.debezium.heartbeat.Heartbeat; import org.apache.commons.collections.map.LinkedMap; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; @@ -51,7 +50,6 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -60,7 +58,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.base.util.MetricStateUtils; @@ -72,9 +71,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -85,7 +82,6 @@ import java.util.concurrent.TimeUnit; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; -import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; @@ -441,29 +437,15 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay()); metricGroup.gauge( "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); - if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - String groupId = inlongMetricArray[0]; - String streamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - AuditImp auditImp = null; - if (inlongAudit != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); - } - sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp); - SimpleCounter recordsInCounter = new SimpleCounter(); - SimpleCounter bytesInCounter = new SimpleCounter(); - if (metricState != null) { - recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN)); - bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN)); - } - sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter); - sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter); - sourceMetricData.registerMetricsForNumRecordsInForMeter(); - sourceMetricData.registerMetricsForNumBytesInForMeter(); - sourceMetricData.registerMetricsForNumBytesInPerSecond(); - sourceMetricData.registerMetricsForNumRecordsInPerSecond(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(inlongAudit) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup()); } properties.setProperty("name", "engine"); properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName()); @@ -504,8 +486,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> public void deserialize(SourceRecord record, Collector<T> out) throws Exception { deserializer.deserialize(record, out); if (sourceMetricData != null) { - sourceMetricData.outputMetrics(1L, - record.value().toString().getBytes(StandardCharsets.UTF_8).length); + sourceMetricData.outputMetricsWithEstimate(record.value()); } } diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java index a886f8aef..02c624290 100644 --- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java +++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java @@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils; import java.util.HashSet; import java.util.Set; @@ -127,7 +126,6 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); String inlongAudit = config.get(INLONG_AUDIT); - ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); return new PostgreSQLTableSource( physicalSchema, diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java deleted file mode 100644 index 18fd19955..000000000 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.pulsar.table; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - -public class Constants { - - public static final ConfigOption<String> INLONG_METRIC = - ConfigOptions.key("inlong.metric") - .stringType() - .defaultValue("") - .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID"); - -} diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java index 6d88a303b..bd4ecaaee 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java @@ -33,7 +33,8 @@ import org.apache.flink.types.DeserializationException; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector; import org.apache.pulsar.client.api.Message; @@ -44,14 +45,9 @@ import java.io.IOException; import java.io.Serializable; import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import static org.apache.inlong.sort.base.Constants.DELIMITER; - /** * A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}. */ @@ -76,12 +72,6 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema< private String inlongMetric; private String auditHostAndPorts; - private AuditImp auditImp; - - private String inlongGroupId; - - private String inlongStreamId; - DynamicPulsarDeserializationSchema( int physicalArity, @Nullable DeserializationSchema<RowData> keyDeserialization, @@ -120,23 +110,14 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema< } valueDeserialization.open(context); - if (inlongMetric != null && !inlongMetric.isEmpty()) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - inlongGroupId = inlongMetricArray[0]; - inlongStreamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, getMetricGroup(context)); - sourceMetricData.registerMetricsForNumBytesIn(); - sourceMetricData.registerMetricsForNumBytesInPerSecond(); - sourceMetricData.registerMetricsForNumRecordsIn(); - sourceMetricData.registerMetricsForNumRecordsInPerSecond(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption, getMetricGroup(context)); } - - if (auditHostAndPorts != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); - } - } /** @@ -191,7 +172,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema< // also not for a cartesian product with the keys if (keyDeserialization == null && !hasMetadata) { valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> { - sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length); + sourceMetricData.outputMetricsWithEstimate(inputRow); collector.collect(inputRow); })); return; @@ -212,7 +193,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema< outputCollector.collect(null); } else { valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> { - sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length); + sourceMetricData.outputMetricsWithEstimate(inputRow); outputCollector.collect(inputRow); })); } diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java index bf74dc8d2..5a3ddfe51 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java @@ -78,7 +78,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti import static org.apache.flink.table.factories.FactoryUtil.FORMAT; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; -import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; /** * Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9 diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java index ff0361b16..e4793dec7 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java @@ -68,7 +68,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; -import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; /** * Upsert-Pulsar factory. diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java index c28115846..16ee84362 100644 --- a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java @@ -20,7 +20,6 @@ package org.apache.inlong.sort.cdc.sqlserver.table; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; -import static org.apache.inlong.sort.base.Constants.DELIMITER; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.Validator; @@ -42,9 +41,7 @@ import io.debezium.heartbeat.Heartbeat; import java.io.IOException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -54,7 +51,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.commons.collections.map.LinkedMap; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; @@ -76,8 +72,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.inlong.audit.AuditImp; -import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -217,14 +213,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private SourceMetricData metricData; - private String inlongGroupId; - private String auditHostAndPorts; - private String inlongStreamId; - - private transient AuditImp auditImp; - // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( @@ -412,20 +402,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay()); metricGroup.gauge( "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); - if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - inlongGroupId = inlongMetricArray[0]; - inlongStreamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, metricGroup); - metricData.registerMetricsForNumRecordsIn(); - metricData.registerMetricsForNumBytesIn(); - metricData.registerMetricsForNumBytesInPerSecond(); - metricData.registerMetricsForNumRecordsInPerSecond(); - } - if (auditHostAndPorts != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + metricData = new SourceMetricData(metricOption, metricGroup); } properties.setProperty("name", "engine"); properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName()); @@ -464,7 +447,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> new DebeziumDeserializationSchema<T>() { @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { - outputMetrics(record); + if (metricData != null) { + metricData.outputMetricsWithEstimate(record.value()); + } deserializer.deserialize(record, out); } @@ -502,23 +487,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> debeziumChangeFetcher.runFetchLoop(); } - private void outputMetrics(SourceRecord record) { - if (metricData != null) { - metricData.getNumRecordsIn().inc(1L); - metricData.getNumBytesIn() - .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length); - } - if (auditImp != null) { - auditImp.add( - Constants.AUDIT_SORT_INPUT, - inlongGroupId, - inlongStreamId, - System.currentTimeMillis(), - 1, - record.value().toString().getBytes(StandardCharsets.UTF_8).length); - } - } - @Override public void notifyCheckpointComplete(long checkpointId) { if (!debeziumStarted) { diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java index 3cc166601..53a8e217f 100644 --- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java +++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java @@ -20,6 +20,7 @@ package org.apache.inlong.sort.parser.impl; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.api.TableEnvironment; +import org.apache.inlong.sort.configuration.Constants; import org.apache.inlong.sort.formats.base.TableFormatUtils; import org.apache.inlong.sort.formats.common.FormatInfo; import org.apache.inlong.sort.function.EncryptFunction; @@ -64,6 +65,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Flink sql parse handler @@ -149,7 +151,7 @@ public class FlinkSqlParser implements Parser { Preconditions.checkNotNull(streamInfo.getRelations(), "relations is null"); Preconditions.checkState(!streamInfo.getRelations().isEmpty(), "relations is empty"); log.info("start parse stream, streamId:{}", streamInfo.getStreamId()); - // Inject the `inlong.metric` for ExtractNode or LoadNode + // Inject the metric option for ExtractNode or LoadNode injectInlongMetric(streamInfo); Map<String, Node> nodeMap = new HashMap<>(streamInfo.getNodes().size()); streamInfo.getNodes().forEach(s -> { @@ -169,7 +171,7 @@ public class FlinkSqlParser implements Parser { } /** - * Inject the `inlong.metric` for ExtractNode or LoadNode + * Inject the metric option for ExtractNode or LoadNode * * @param streamInfo The encapsulation of nodes and node relations */ @@ -183,16 +185,19 @@ public class FlinkSqlParser implements Parser { } else if (node instanceof ExtractNode) { ((ExtractNode) node).setProperties(properties); } else { - throw new UnsupportedOperationException(String.format("Unsupported inlong metric for: %s", - node.getClass().getSimpleName())); + throw new UnsupportedOperationException(String.format( + "Unsupported inlong group stream node for: %s", node.getClass().getSimpleName())); } } - properties.put(InlongMetric.METRIC_KEY, - String.format(InlongMetric.METRIC_VALUE_FORMAT, groupInfo.getGroupId(), - streamInfo.getStreamId(), node.getId())); - if (StringUtils.isNotEmpty(groupInfo.getProperties().get(InlongMetric.AUDIT_KEY))) { - properties.put(InlongMetric.AUDIT_KEY, - groupInfo.getProperties().get(InlongMetric.AUDIT_KEY)); + properties.put(Constants.METRICS_LABELS.key(), + Stream.of(Constants.GROUP_ID + "=" + groupInfo.getGroupId(), + Constants.STREAM_ID + "=" + streamInfo.getStreamId(), + Constants.NODE_ID + "=" + node.getId()) + .collect(Collectors.joining("&"))); + // METRICS_AUDIT_PROXY_HOSTS depends on INLONG_GROUP_STREAM_NODE + if (StringUtils.isNotEmpty(groupInfo.getProperties().get(Constants.METRICS_AUDIT_PROXY_HOSTS.key()))) { + properties.put(Constants.METRICS_AUDIT_PROXY_HOSTS.key(), + groupInfo.getProperties().get(Constants.METRICS_AUDIT_PROXY_HOSTS.key())); } }); }
