This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0a672e365 [INLONG-6962][Sort] Add read phase metric and table level
metric for MySQL-CDC (#6966)
0a672e365 is described below
commit 0a672e3651680bc420d28305d679051c9866931d
Author: chestnufang <[email protected]>
AuthorDate: Mon Dec 19 21:49:03 2022 +0800
[INLONG-6962][Sort] Add read phase metric and table level metric for
MySQL-CDC (#6966)
Co-authored-by: chestnufang <[email protected]>
---
.../source/metrics/MySqlSourceReaderMetrics.java | 50 +++++++++++----
.../mysql/source/reader/MySqlRecordEmitter.java | 10 ++-
.../cdc/mysql/source/reader/MySqlSourceReader.java | 22 +++++--
.../cdc/mysql/source/split/MySqlMetricSplit.java | 74 +++++++++++++++++++++-
.../mysql/source/split/MySqlSplitSerializer.java | 48 +++++++++++++-
5 files changed, 181 insertions(+), 23 deletions(-)
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index 1ebd1c7a5..c941eb083 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -17,11 +17,19 @@
package org.apache.inlong.sort.cdc.mysql.source.metrics;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
+import
org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric;
/**
* A collection class for handling metrics in {@link MySqlSourceReader}.
@@ -49,7 +57,7 @@ public class MySqlSourceReaderMetrics {
*/
private volatile long emitDelay = 0L;
- private SourceMetricData sourceMetricData;
+ private SourceTableMetricData sourceTableMetricData;
public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
@@ -57,7 +65,7 @@ public class MySqlSourceReaderMetrics {
public void registerMetrics(MetricOption metricOption) {
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+ sourceTableMetricData = new SourceTableMetricData(metricOption,
metricGroup);
}
metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>)
this::getFetchDelay);
metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>)
this::getEmitDelay);
@@ -92,20 +100,38 @@ public class MySqlSourceReaderMetrics {
this.emitDelay = emitDelay;
}
- public void outputMetrics(long rowCountSize, long rowDataSize) {
- if (sourceMetricData != null) {
- sourceMetricData.outputMetrics(rowCountSize, rowDataSize);
+ public void outputMetrics(String database, String table, boolean
isSnapshotRecord, Object data) {
+ if (sourceTableMetricData != null) {
+ sourceTableMetricData.outputMetricsWithEstimate(database, table,
isSnapshotRecord, data);
}
}
- public void initMetrics(long rowCountSize, long rowDataSize) {
- if (sourceMetricData != null) {
- sourceMetricData.getNumBytesIn().inc(rowDataSize);
- sourceMetricData.getNumRecordsIn().inc(rowCountSize);
+ public void initMetrics(long rowCountSize, long rowDataSize, Map<String,
Long> readPhaseMetricMap,
+ Map<String, MySqlTableMetric> tableMetricMap) {
+ if (sourceTableMetricData != null) {
+ // node level metric data
+ sourceTableMetricData.getNumBytesIn().inc(rowDataSize);
+ sourceTableMetricData.getNumRecordsIn().inc(rowCountSize);
+
+ // register read phase metric data and table level metric data
+ if (readPhaseMetricMap != null && tableMetricMap != null) {
+ MetricState metricState = new MetricState();
+ metricState.setMetrics(readPhaseMetricMap);
+ Map<String, MetricState> subMetricStateMap = new HashMap<>();
+ tableMetricMap.entrySet().stream().filter(v -> v.getValue() !=
null).forEach(entry -> {
+ MetricState subMetricState = new MetricState();
+ subMetricState.setMetrics(ImmutableMap
+ .of(NUM_RECORDS_IN,
entry.getValue().getNumRecordsIn(), NUM_BYTES_IN,
+ entry.getValue().getNumBytesIn()));
+ subMetricStateMap.put(entry.getKey(), subMetricState);
+ });
+ metricState.setSubMetricStateMap(subMetricStateMap);
+ sourceTableMetricData.registerSubMetricsGroup(metricState);
+ }
}
}
- public SourceMetricData getSourceMetricData() {
- return sourceMetricData;
+ public SourceTableMetricData getSourceMetricData() {
+ return sourceTableMetricData;
}
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index e09b3ee88..eb961df00 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.cdc.mysql.source.reader;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
+import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.document.Array;
import io.debezium.relational.TableId;
@@ -38,7 +39,6 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.StandardCharsets;
import java.util.Map;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
@@ -146,8 +146,12 @@ public final class MySqlRecordEmitter<T>
@Override
public void collect(final T t) {
- long byteNum =
t.toString().getBytes(StandardCharsets.UTF_8).length;
- sourceReaderMetrics.outputMetrics(1L, byteNum);
+ Struct value = (Struct) element.value();
+ Struct source =
value.getStruct(Envelope.FieldName.SOURCE);
+ String databaseName =
source.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
+ String tableName =
source.getString(AbstractSourceInfo.TABLE_NAME_KEY);
+
+ sourceReaderMetrics.outputMetrics(databaseName,
tableName, iSnapShot, t);
output.collect(t);
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 29a4a1282..0a7ea82c6 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mysql.source.reader;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
+import java.util.Map.Entry;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
@@ -28,7 +29,7 @@ import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
import org.apache.inlong.sort.cdc.mysql.source.events.BinlogSplitMetaEvent;
@@ -47,6 +48,7 @@ import
org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplitState;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit;
+import
org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplitState;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
@@ -147,12 +149,19 @@ public class MySqlSourceReader<T>
if (suspendedBinlogSplit != null) {
unfinishedSplits.add(suspendedBinlogSplit);
}
- SourceMetricData sourceMetricData =
sourceReaderMetrics.getSourceMetricData();
+ SourceTableMetricData sourceMetricData =
sourceReaderMetrics.getSourceMetricData();
LOG.info("inlong-metric-states snapshot sourceMetricData:{}",
sourceMetricData);
if (sourceMetricData != null) {
- unfinishedSplits.add(
- new
MySqlMetricSplit(sourceMetricData.getNumBytesIn().getCount(),
- sourceMetricData.getNumRecordsIn().getCount()));
+ long countNumBytesIn = sourceMetricData.getNumBytesIn().getCount();
+ long countNumRecordsIn =
sourceMetricData.getNumRecordsIn().getCount();
+ Map<String, Long> readPhaseMetricMap =
sourceMetricData.getReadPhaseMetricMap().entrySet().stream().collect(
+ Collectors.toMap(v -> v.getKey().getPhase(), e ->
e.getValue().getReadPhase().getCount()));
+ Map<String, MySqlTableMetric> tableMetricMap =
sourceMetricData.getSubSourceMetricMap().entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey,
+ e -> new
MySqlTableMetric(e.getValue().getNumRecordsIn().getCount(),
+ e.getValue().getNumBytesIn().getCount())));
+ unfinishedSplits
+ .add(new MySqlMetricSplit(countNumBytesIn,
countNumRecordsIn, readPhaseMetricMap, tableMetricMap));
}
return unfinishedSplits;
}
@@ -187,7 +196,8 @@ public class MySqlSourceReader<T>
MySqlMetricSplit mysqlMetricSplit = (MySqlMetricSplit) split;
LOG.info("inlong-metric-states restore metricSplit:{}",
mysqlMetricSplit);
sourceReaderMetrics.initMetrics(mysqlMetricSplit.getNumRecordsIn(),
- mysqlMetricSplit.getNumBytesIn());
+ mysqlMetricSplit.getNumBytesIn(),
mysqlMetricSplit.getReadPhaseMetricMap(),
+ mysqlMetricSplit.getTableMetricMap());
LOG.info("inlong-metric-states restore sourceReaderMetrics:{}",
sourceReaderMetrics.getSourceMetricData());
continue;
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
index 368d97260..c9d832207 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mysql.source.split;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
+import java.io.Serializable;
import java.util.Map;
/**
@@ -31,6 +32,16 @@ public class MySqlMetricSplit extends MySqlSplit {
private Long numBytesIn = 0L;
+ /**
+ * The table level metric in a split of mysql metric.
+ */
+ private Map<String, MySqlTableMetric> tableMetricMap;
+
+ /**
+ * The read phase timestamp metric in a split of mysql metric.
+ */
+ private Map<String, Long> readPhaseMetricMap;
+
public Long getNumRecordsIn() {
return numRecordsIn;
}
@@ -47,14 +58,34 @@ public class MySqlMetricSplit extends MySqlSplit {
this.numBytesIn = numBytesIn;
}
+ public Map<String, MySqlTableMetric> getTableMetricMap() {
+ return tableMetricMap;
+ }
+
+ public void setTableMetricMap(
+ Map<String, MySqlTableMetric> tableMetricMap) {
+ this.tableMetricMap = tableMetricMap;
+ }
+
+ public Map<String, Long> getReadPhaseMetricMap() {
+ return readPhaseMetricMap;
+ }
+
+ public void setReadPhaseMetricMap(Map<String, Long> readPhaseMetricMap) {
+ this.readPhaseMetricMap = readPhaseMetricMap;
+ }
+
public MySqlMetricSplit(String splitId) {
super(splitId);
}
- public MySqlMetricSplit(Long numBytesIn, Long numRecordsIn) {
+ public MySqlMetricSplit(Long numBytesIn, Long numRecordsIn, Map<String,
Long> readPhaseMetricMap,
+ Map<String, MySqlTableMetric> tableMetricMap) {
this("");
this.numBytesIn = numBytesIn;
this.numRecordsIn = numRecordsIn;
+ this.readPhaseMetricMap = readPhaseMetricMap;
+ this.tableMetricMap = tableMetricMap;
}
public void setMetricData(long count, long byteNum) {
@@ -72,6 +103,47 @@ public class MySqlMetricSplit extends MySqlSplit {
return "MysqlMetricSplit{"
+ "numRecordsIn=" + numRecordsIn
+ ", numBytesIn=" + numBytesIn
+ + ", tableMetricMap=" + tableMetricMap
+ + ", readPhaseMetricMap=" + readPhaseMetricMap
+ '}';
}
+
+ /**
+ * The mysql table level metric in a split of mysql metric.
+ */
+ public static class MySqlTableMetric implements Serializable {
+
+ private Long numRecordsIn;
+
+ private Long numBytesIn;
+
+ public MySqlTableMetric(Long numRecordsIn, Long numBytesIn) {
+ this.numRecordsIn = numRecordsIn;
+ this.numBytesIn = numBytesIn;
+ }
+
+ public Long getNumRecordsIn() {
+ return numRecordsIn;
+ }
+
+ public void setNumRecordsIn(Long numRecordsIn) {
+ this.numRecordsIn = numRecordsIn;
+ }
+
+ public Long getNumBytesIn() {
+ return numBytesIn;
+ }
+
+ public void setNumBytesIn(Long numBytesIn) {
+ this.numBytesIn = numBytesIn;
+ }
+
+ @Override
+ public String toString() {
+ return "MySqlTableMetric{"
+ + "numRecordsIn=" + numRecordsIn
+ + ", numBytesIn=" + numBytesIn
+ + '}';
+ }
+ }
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
index 1f1247b21..325ef4fca 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import
org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.readBinlogPosition;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.rowToSerializedString;
@@ -134,6 +135,46 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
return finishedSplitsInfo;
}
+ private static void writeReadPhaseMetric(Map<String, Long>
readPhaseMetrics, DataOutputSerializer out)
+ throws IOException {
+ final int size = readPhaseMetrics.size();
+ out.writeInt(size);
+ for (Map.Entry<String, Long> entry : readPhaseMetrics.entrySet()) {
+ out.writeUTF(entry.getKey());
+ out.writeLong(entry.getValue());
+ }
+ }
+
+ private static Map<String, Long> readReadPhaseMetric(DataInputDeserializer
in) throws IOException {
+ Map<String, Long> readPhaseMetrics = new HashMap<>();
+ final int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ readPhaseMetrics.put(in.readUTF(), in.readLong());
+ }
+ return readPhaseMetrics;
+ }
+
+ private static void writeTableMetrics(Map<String, MySqlTableMetric>
tableMetrics, DataOutputSerializer out)
+ throws IOException {
+ final int size = tableMetrics.size();
+ out.writeInt(size);
+ for (Map.Entry<String, MySqlTableMetric> entry :
tableMetrics.entrySet()) {
+ out.writeUTF(entry.getKey());
+ out.writeLong(entry.getValue().getNumRecordsIn());
+ out.writeLong(entry.getValue().getNumBytesIn());
+ }
+ }
+
+ private static Map<String, MySqlTableMetric>
readTableMetrics(DataInputDeserializer in) throws IOException {
+ Map<String, MySqlTableMetric> tableMetrics = new HashMap<>();
+ final int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ String tableIdentify = in.readUTF();
+ tableMetrics.put(tableIdentify, new
MySqlTableMetric(in.readLong(), in.readLong()));
+ }
+ return tableMetrics;
+ }
+
@Override
public int getVersion() {
return VERSION;
@@ -195,6 +236,8 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
out.writeInt(METRIC_SPLIT_FLAG);
out.writeLong(mysqlMetricSplit.getNumBytesIn());
out.writeLong(mysqlMetricSplit.getNumRecordsIn());
+ writeReadPhaseMetric(mysqlMetricSplit.getReadPhaseMetricMap(),
out);
+ writeTableMetrics(mysqlMetricSplit.getTableMetricMap(), out);
final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
@@ -267,7 +310,10 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
} else if (splitKind == METRIC_SPLIT_FLAG) {
long numBytesIn = in.readLong();
long numRecordsIn = in.readLong();
- return new MySqlMetricSplit(numBytesIn, numRecordsIn);
+ Map<String, Long> readPhaseMetricMap = readReadPhaseMetric(in);
+ Map<String, MySqlTableMetric> tableMetricMap =
readTableMetrics(in);
+
+ return new MySqlMetricSplit(numBytesIn, numRecordsIn,
readPhaseMetricMap, tableMetricMap);
} else {
throw new IOException("Unknown split kind: " + splitKind);
}