This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 042c6b2a7 [INLONG-6318][Sort] MySQL connector supports snapshots and
restores the metric state (#6319)
042c6b2a7 is described below
commit 042c6b2a7bd33fb7346ca3173e208b15b16da7c7
Author: Xin Gong <[email protected]>
AuthorDate: Mon Nov 7 11:40:03 2022 +0800
[INLONG-6318][Sort] MySQL connector supports snapshots and restores the
metric state (#6319)
---
.../sort/cdc/debezium/DebeziumSourceFunction.java | 21 ++++++
.../inlong/sort/cdc/mysql/source/MySqlSource.java | 2 +-
.../source/metrics/MySqlSourceReaderMetrics.java | 11 +++
.../mysql/source/reader/MySqlRecordEmitter.java | 4 +-
.../cdc/mysql/source/reader/MySqlSourceReader.java | 26 +++++++-
.../cdc/mysql/source/split/MySqlMetricSplit.java | 78 ++++++++++++++++++++++
.../sort/cdc/mysql/source/split/MySqlSplit.java | 8 +++
.../mysql/source/split/MySqlSplitSerializer.java | 16 ++++-
8 files changed, 160 insertions(+), 6 deletions(-)
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index 7d7b9bcd0..80d6812f2 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
@@ -49,7 +50,9 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumOffset;
@@ -76,6 +79,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
import static
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
@@ -227,6 +231,10 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
private SourceMetricData sourceMetricData;
+ private transient ListState<MetricState> metricStateListState;
+
+ private MetricState metricState;
+
//
---------------------------------------------------------------------------------------
public DebeziumSourceFunction(
@@ -271,10 +279,19 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
stateStore.getUnionListState(
new ListStateDescriptor<>(
HISTORY_RECORDS_STATE_NAME,
BasicTypeInfo.STRING_TYPE_INFO));
+ if (this.inlongMetric != null) {
+ this.metricStateListState =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME,
TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
if (context.isRestored()) {
restoreOffsetState();
restoreHistoryRecordsState();
+ metricState =
MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
} else {
if (specificOffset != null) {
byte[] serializedOffset =
@@ -344,6 +361,10 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
+ if (sourceMetricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState,
sourceMetricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
}
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index ea102f429..60484ddf4 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -167,7 +167,7 @@ public class MySqlSource<T>
sourceConfig.isIncludeSchemaChanges()),
readerContext.getConfiguration(),
mySqlSourceReaderContext,
- sourceConfig);
+ sourceConfig, sourceReaderMetrics);
}
@Override
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index 45c81e560..19ca7c570 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -98,4 +98,15 @@ public class MySqlSourceReaderMetrics {
sourceMetricData.outputMetrics(rowCountSize, rowDataSize);
}
}
+
+ public void initMetrics(long rowCountSize, long rowDataSize) {
+ if (sourceMetricData != null) {
+ sourceMetricData.getNumBytesIn().inc(rowDataSize);
+ sourceMetricData.getNumRecordsIn().inc(rowCountSize);
+ }
+ }
+
+ public SourceMetricData getSourceMetricData() {
+ return sourceMetricData;
+ }
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index d2cc328f9..f0fe28b57 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -145,8 +145,8 @@ public final class MySqlRecordEmitter<T>
new Collector<T>() {
@Override
public void collect(final T t) {
- sourceReaderMetrics.outputMetrics(1L,
-
t.toString().getBytes(StandardCharsets.UTF_8).length);
+ long byteNum =
t.toString().getBytes(StandardCharsets.UTF_8).length;
+ sourceReaderMetrics.outputMetrics(1L, byteNum);
output.collect(t);
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 07c96d542..140db93fb 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -29,6 +29,7 @@ import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
import org.apache.inlong.sort.cdc.mysql.source.events.BinlogSplitMetaEvent;
@@ -41,10 +42,12 @@ import
org.apache.inlong.sort.cdc.mysql.source.events.LatestFinishedSplitsSizeRe
import
org.apache.inlong.sort.cdc.mysql.source.events.SuspendBinlogReaderAckEvent;
import org.apache.inlong.sort.cdc.mysql.source.events.SuspendBinlogReaderEvent;
import org.apache.inlong.sort.cdc.mysql.source.events.WakeupReaderEvent;
+import
org.apache.inlong.sort.cdc.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplitState;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplitState;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
@@ -84,6 +87,7 @@ public class MySqlSourceReader<T>
private final int subtaskId;
private final MySqlSourceReaderContext mySqlSourceReaderContext;
private MySqlBinlogSplit suspendedBinlogSplit;
+ private MySqlSourceReaderMetrics sourceReaderMetrics;
public MySqlSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>>
elementQueue,
@@ -91,7 +95,8 @@ public class MySqlSourceReader<T>
RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter,
Configuration config,
MySqlSourceReaderContext context,
- MySqlSourceConfig sourceConfig) {
+ MySqlSourceConfig sourceConfig,
+ MySqlSourceReaderMetrics sourceReaderMetrics) {
super(
elementQueue,
new SingleThreadFetcherManager<>(elementQueue,
splitReaderSupplier::get),
@@ -104,6 +109,7 @@ public class MySqlSourceReader<T>
this.subtaskId = context.getSourceReaderContext().getIndexOfSubtask();
this.mySqlSourceReaderContext = context;
this.suspendedBinlogSplit = null;
+ this.sourceReaderMetrics = sourceReaderMetrics;
}
@Override
@@ -142,6 +148,13 @@ public class MySqlSourceReader<T>
if (suspendedBinlogSplit != null) {
unfinishedSplits.add(suspendedBinlogSplit);
}
+ SourceMetricData sourceMetricData =
sourceReaderMetrics.getSourceMetricData();
+ LOG.info("inlong-metric-states snapshot sourceMetricData:{}",
sourceMetricData);
+ if (sourceMetricData != null) {
+ unfinishedSplits.add(
+ new
MySqlMetricSplit(sourceMetricData.getNumBytesIn().getCount(),
+ sourceMetricData.getNumRecordsIn().getCount()));
+ }
return unfinishedSplits;
}
@@ -171,6 +184,15 @@ public class MySqlSourceReader<T>
List<MySqlSplit> unfinishedSplits = new ArrayList<>();
for (MySqlSplit split : splits) {
LOG.info("Add Split: " + split);
+ if (split.isMetricSplit()) {
+ MySqlMetricSplit mysqlMetricSplit = (MySqlMetricSplit) split;
+ LOG.info("inlong-metric-states restore metricSplit:{}",
mysqlMetricSplit);
+
sourceReaderMetrics.initMetrics(mysqlMetricSplit.getNumRecordsIn(),
+ mysqlMetricSplit.getNumBytesIn());
+ LOG.info("inlong-metric-states restore sourceReaderMetrics:{}",
+ sourceReaderMetrics.getSourceMetricData());
+ continue;
+ }
if (split.isSnapshotSplit()) {
MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
@@ -206,7 +228,7 @@ public class MySqlSourceReader<T>
final String splitId = split.splitId();
if (split.getTableSchemas().isEmpty()) {
try (MySqlConnection jdbc =
-
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
+
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
LOG.info("The table schema discovery for binlog split {}
success", splitId);
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
new file mode 100644
index 000000000..433d1ce60
--- /dev/null
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.split;
+
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges.TableChange;
+
+import java.util.Map;
+
+/**
+ * The split to describe a split of MySql metric.
+ */
+public class MySqlMetricSplit extends MySqlSplit {
+
+ private Long numRecordsIn = 0L;
+
+ private Long numBytesIn = 0L;
+
+ public Long getNumRecordsIn() {
+ return numRecordsIn;
+ }
+
+ public void setNumRecordsIn(Long numRecordsIn) {
+ this.numRecordsIn = numRecordsIn;
+ }
+
+ public Long getNumBytesIn() {
+ return numBytesIn;
+ }
+
+ public void setNumBytesIn(Long numBytesIn) {
+ this.numBytesIn = numBytesIn;
+ }
+
+ public MySqlMetricSplit(String splitId) {
+ super(splitId);
+ }
+
+ public MySqlMetricSplit(Long numBytesIn, Long numRecordsIn) {
+ this("");
+ this.numBytesIn = numBytesIn;
+ this.numRecordsIn = numRecordsIn;
+ }
+
+ public void setMetricData(long count, long byteNum) {
+ numRecordsIn = numRecordsIn + count;
+ numBytesIn = numBytesIn + byteNum;
+ }
+
+ @Override
+ public Map<TableId, TableChange> getTableSchemas() {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "MysqlMetricSplit{"
+ + "numRecordsIn=" + numRecordsIn
+ + ", numBytesIn=" + numBytesIn
+ + '}';
+ }
+}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
index a84f196bd..5f29a3149 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
@@ -34,6 +34,14 @@ public abstract class MySqlSplit implements SourceSplit {
this.splitId = splitId;
}
+ public final boolean isMetricSplit() {
+ return getClass() == MySqlMetricSplit.class;
+ }
+
+ public final MySqlMetricSplit asMetricSplit() {
+ return (MySqlMetricSplit) this;
+ }
+
/** Checks whether this split is a snapshot split. */
public final boolean isSnapshotSplit() {
return getClass() == MySqlSnapshotSplit.class;
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
index 18f7161e5..6b7b5de77 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
@@ -56,6 +56,7 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
private static final int SNAPSHOT_SPLIT_FLAG = 1;
private static final int BINLOG_SPLIT_FLAG = 2;
+ private static final int METRIC_SPLIT_FLAG = 3;
private static void writeTableSchemas(
Map<TableId, TableChange> tableSchemas, DataOutputSerializer out)
throws IOException {
@@ -167,7 +168,7 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
// serialization
snapshotSplit.serializedFormCache = result;
return result;
- } else {
+ } else if (split.isBinlogSplit()) {
final MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
// optimization: the splits lazily cache their own serialized form
if (binlogSplit.serializedFormCache != null) {
@@ -189,6 +190,15 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
// serialization
binlogSplit.serializedFormCache = result;
return result;
+ } else {
+ final MySqlMetricSplit mysqlMetricSplit = split.asMetricSplit();
+ final DataOutputSerializer out = SERIALIZER_CACHE.get();
+ out.writeInt(METRIC_SPLIT_FLAG);
+ out.writeLong(mysqlMetricSplit.getNumBytesIn());
+ out.writeLong(mysqlMetricSplit.getNumRecordsIn());
+ final byte[] result = out.getCopyOfBuffer();
+ out.clear();
+ return result;
}
}
@@ -255,6 +265,10 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
tableChangeMap,
totalFinishedSplitSize,
isSuspended);
+ } else if (splitKind == METRIC_SPLIT_FLAG) {
+ long numBytesIn = in.readLong();
+ long numRecordsIn = in.readLong();
+ return new MySqlMetricSplit(numBytesIn, numRecordsIn);
} else {
throw new IOException("Unknown split kind: " + splitKind);
}