This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2f1e6d0e36 [INLONG-10575][Sort] Make mysql source support report audit
information exactly once (#10576)
2f1e6d0e36 is described below
commit 2f1e6d0e36a2968b062e22738ce0df95b532977a
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jul 9 10:17:48 2024 +0800
[INLONG-10575][Sort] Make mysql source support report audit information
exactly once (#10576)
---
.../mysql/RowDataDebeziumDeserializeSchema.java | 35 +-
.../inlong/sort/mysql/source/MySqlSource.java | 5 +-
.../mysql/source/reader/MySqlSourceReader.java | 383 +++++++++++++++++++++
licenses/inlong-sort-connectors/LICENSE | 2 +-
4 files changed, 411 insertions(+), 14 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
index 85e21fe228..0a1f2013f2 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.mysql;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.AppendMetadataCollector;
@@ -53,8 +53,6 @@ import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.math.BigDecimal;
@@ -74,8 +72,6 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*/
public final class RowDataDebeziumDeserializeSchema implements
DebeziumDeserializationSchema<RowData> {
- private final static Logger LOG =
LoggerFactory.getLogger(RowDataDebeziumDeserializeSchema.class);
-
private static final long serialVersionUID = 2L;
/** Custom validator to validate the row value. */
@@ -106,7 +102,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
/** Changelog Mode to use for encoding changes in Flink internal data
structure. */
private final DebeziumChangelogMode changelogMode;
- private SourceMetricData sourceMetricData;
+ private SourceExactlyMetric sourceExactlyMetric;
private final MetricOption metricOption;
/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
@@ -145,7 +141,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
- if (sourceMetricData != null) {
+ if (sourceExactlyMetric != null) {
out = createMetricsCollector(record, out);
}
emit(record, insert, out);
@@ -153,7 +149,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
- if (sourceMetricData != null) {
+ if (sourceExactlyMetric != null) {
out = createMetricsCollector(record, out);
}
emit(record, delete, out);
@@ -168,7 +164,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
- if (sourceMetricData != null) {
+ if (sourceExactlyMetric != null) {
out = createMetricsCollector(record, out);
}
emit(record, after, out);
@@ -182,7 +178,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
* @return metrics collector
*/
private Collector<RowData> createMetricsCollector(SourceRecord record,
Collector<RowData> out) {
- MetricsCollector<RowData> collector = new MetricsCollector<>(out,
sourceMetricData);
+ MetricsCollector<RowData> collector = new MetricsCollector<>(out,
sourceExactlyMetric);
collector.resetTimestamp((Long) ((Struct)
record.value()).get(FieldName.TIMESTAMP));
return collector;
}
@@ -194,7 +190,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
*/
public void initSourceMetricData() {
if (metricOption != null) {
- this.sourceMetricData = new SourceMetricData(metricOption);
+ this.sourceExactlyMetric = new SourceExactlyMetric(metricOption);
}
}
@@ -225,6 +221,23 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
return resultTypeInfo;
}
+ public void flushAudit() {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.flushAudit();
+ }
+ }
+
+ public void updateCurrentCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ }
+ }
+
+ public void updateLastCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+ }
+ }
//
-------------------------------------------------------------------------------------
// Builder
//
-------------------------------------------------------------------------------------
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
index 801d172372..da1dcbadae 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.mysql.source;
import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema;
+import org.apache.inlong.sort.mysql.source.reader.MySqlSourceReader;
import com.ververica.cdc.connectors.mysql.MySqlValidator;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
@@ -33,7 +34,6 @@ import
com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory
import
com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import
com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter;
-import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
@@ -167,7 +167,8 @@ public class MySqlSource<T>
sourceConfig.isIncludeSchemaChanges()),
readerContext.getConfiguration(),
mySqlSourceReaderContext,
- sourceConfig);
+ sourceConfig,
+ deserializationSchema);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
new file mode 100644
index 0000000000..01f34f28b1
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mysql.source.reader;
+
+import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema;
+
+import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
+import
com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
+import
com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
+import
com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent;
+import
com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
+import
com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeEvent;
+import
com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeRequestEvent;
+import
com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderAckEvent;
+import
com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderEvent;
+import com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
+import
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext;
+import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader;
+import
com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
+import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
+import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplitState;
+import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
+import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplitState;
+import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
+import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
+import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
+import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static
com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER;
+import static
com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit;
+import static
com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit;
+import static
com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId;
+
+/** The source reader for MySQL source splits.
+ * copy from {@link
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader}
+ * */
+public class MySqlSourceReader<T>
+ extends
+ SingleThreadMultiplexSourceReaderBase<SourceRecords, T,
MySqlSplit, MySqlSplitState> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlSourceReader.class);
+
+ private final MySqlSourceConfig sourceConfig;
+ private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
+ private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
+ private final int subtaskId;
+ private final MySqlSourceReaderContext mySqlSourceReaderContext;
+ private MySqlBinlogSplit suspendedBinlogSplit;
+ private final DebeziumDeserializationSchema<T> metricSchema;
+
+ public MySqlSourceReader(
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>>
elementQueue,
+ Supplier<MySqlSplitReader> splitReaderSupplier,
+ RecordEmitter<SourceRecords, T, MySqlSplitState> recordEmitter,
+ Configuration config,
+ MySqlSourceReaderContext context,
+ MySqlSourceConfig sourceConfig, DebeziumDeserializationSchema<T>
metricSchema) {
+ super(
+ elementQueue,
+ new SingleThreadFetcherManager<>(elementQueue,
splitReaderSupplier::get),
+ recordEmitter,
+ config,
+ context.getSourceReaderContext());
+ this.sourceConfig = sourceConfig;
+ this.finishedUnackedSplits = new HashMap<>();
+ this.uncompletedBinlogSplits = new HashMap<>();
+ this.subtaskId = context.getSourceReaderContext().getIndexOfSubtask();
+ this.mySqlSourceReaderContext = context;
+ this.suspendedBinlogSplit = null;
+ this.metricSchema = metricSchema;
+ }
+
+ @Override
+ public void start() {
+ if (getNumberOfCurrentlyAssignedSplits() == 0) {
+ context.sendSplitRequest();
+ }
+ }
+
+ @Override
+ protected MySqlSplitState initializedState(MySqlSplit split) {
+ if (split.isSnapshotSplit()) {
+ return new MySqlSnapshotSplitState(split.asSnapshotSplit());
+ } else {
+ return new MySqlBinlogSplitState(split.asBinlogSplit());
+ }
+ }
+
+ @Override
+ public List<MySqlSplit> snapshotState(long checkpointId) {
+ if (metricSchema instanceof RowDataDebeziumDeserializeSchema) {
+ ((RowDataDebeziumDeserializeSchema)
metricSchema).updateCurrentCheckpointId(checkpointId);
+ }
+ List<MySqlSplit> stateSplits = super.snapshotState(checkpointId);
+
+ // unfinished splits
+ List<MySqlSplit> unfinishedSplits =
+ stateSplits.stream()
+ .filter(split ->
!finishedUnackedSplits.containsKey(split.splitId()))
+ .collect(Collectors.toList());
+
+ // add finished snapshot splits that didn't receive ack yet
+ unfinishedSplits.addAll(finishedUnackedSplits.values());
+
+ // add binlog splits who are uncompleted
+ unfinishedSplits.addAll(uncompletedBinlogSplits.values());
+
+ // add suspended BinlogSplit
+ if (suspendedBinlogSplit != null) {
+ unfinishedSplits.add(suspendedBinlogSplit);
+ }
+
+ logCurrentBinlogOffsets(unfinishedSplits, checkpointId);
+
+ return unfinishedSplits;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ super.notifyCheckpointComplete(checkpointId);
+ if (metricSchema instanceof RowDataDebeziumDeserializeSchema) {
+ RowDataDebeziumDeserializeSchema schema =
(RowDataDebeziumDeserializeSchema) metricSchema;
+ schema.flushAudit();
+ schema.updateLastCheckpointId(checkpointId);
+ }
+ }
+
+ @Override
+ protected void onSplitFinished(Map<String, MySqlSplitState>
finishedSplitIds) {
+ boolean requestNextSplit = true;
+ for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
+ MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
+ if (mySqlSplit.isBinlogSplit()) {
+ LOG.info(
+ "binlog split reader suspended due to newly added
table, offset {}",
+
mySqlSplitState.asBinlogSplitState().getStartingOffset());
+
+ mySqlSourceReaderContext.resetStopBinlogSplitReader();
+ suspendedBinlogSplit =
toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit());
+ context.sendSourceEventToCoordinator(new
SuspendBinlogReaderAckEvent());
+ // do not request next split when the reader is suspended, the
suspended reader will
+ // automatically request the next split after it has been
wakeup
+ requestNextSplit = false;
+ } else {
+ finishedUnackedSplits.put(mySqlSplit.splitId(),
mySqlSplit.asSnapshotSplit());
+ }
+ }
+ reportFinishedSnapshotSplitsIfNeed();
+ if (requestNextSplit) {
+ context.sendSplitRequest();
+ }
+ }
+
+ @Override
+ public void addSplits(List<MySqlSplit> splits) {
+ // restore for finishedUnackedSplits
+ List<MySqlSplit> unfinishedSplits = new ArrayList<>();
+ for (MySqlSplit split : splits) {
+ LOG.info("Add Split: " + split);
+ if (split.isSnapshotSplit()) {
+ MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
+ if (snapshotSplit.isSnapshotReadFinished()) {
+ finishedUnackedSplits.put(snapshotSplit.splitId(),
snapshotSplit);
+ } else {
+ unfinishedSplits.add(split);
+ }
+ } else {
+ MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
+ // the binlog split is suspended
+ if (binlogSplit.isSuspended()) {
+ suspendedBinlogSplit = binlogSplit;
+ } else if (!binlogSplit.isCompletedSplit()) {
+ uncompletedBinlogSplits.put(split.splitId(),
split.asBinlogSplit());
+ requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
+ } else {
+ uncompletedBinlogSplits.remove(split.splitId());
+ MySqlBinlogSplit mySqlBinlogSplit =
+
discoverTableSchemasForBinlogSplit(split.asBinlogSplit());
+ unfinishedSplits.add(mySqlBinlogSplit);
+ }
+ }
+ }
+ // notify split enumerator again about the finished unacked snapshot
splits
+ reportFinishedSnapshotSplitsIfNeed();
+ // add all un-finished splits (including binlog split) to
SourceReaderBase
+ if (!unfinishedSplits.isEmpty()) {
+ super.addSplits(unfinishedSplits);
+ }
+ }
+
+ private MySqlBinlogSplit
discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
+ final String splitId = split.splitId();
+ if (split.getTableSchemas().isEmpty()) {
+ try (MySqlConnection jdbc =
DebeziumUtils.createMySqlConnection(sourceConfig)) {
+ Map<TableId, TableChanges.TableChange> tableSchemas =
+
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
+ LOG.info("The table schema discovery for binlog split {}
success", splitId);
+ return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
+ } catch (SQLException e) {
+ LOG.error("Failed to obtains table schemas due to {}",
e.getMessage());
+ throw new FlinkRuntimeException(e);
+ }
+ } else {
+ LOG.warn(
+ "The binlog split {} has table schemas yet, skip the table
schema discovery",
+ split);
+ return split;
+ }
+ }
+
+ @Override
+ public void handleSourceEvents(SourceEvent sourceEvent) {
+ if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
+ FinishedSnapshotSplitsAckEvent ackEvent =
(FinishedSnapshotSplitsAckEvent) sourceEvent;
+ LOG.debug(
+ "The subtask {} receives ack event for {} from
enumerator.",
+ subtaskId,
+ ackEvent.getFinishedSplits());
+ for (String splitId : ackEvent.getFinishedSplits()) {
+ this.finishedUnackedSplits.remove(splitId);
+ }
+ } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
+ // report finished snapshot splits
+ LOG.debug(
+ "The subtask {} receives request to report finished
snapshot splits.",
+ subtaskId);
+ reportFinishedSnapshotSplitsIfNeed();
+ } else if (sourceEvent instanceof BinlogSplitMetaEvent) {
+ LOG.debug(
+ "The subtask {} receives binlog meta with group id {}.",
+ subtaskId,
+ ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
+ fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
+ } else if (sourceEvent instanceof SuspendBinlogReaderEvent) {
+ mySqlSourceReaderContext.setStopBinlogSplitReader();
+ } else if (sourceEvent instanceof WakeupReaderEvent) {
+ WakeupReaderEvent wakeupReaderEvent = (WakeupReaderEvent)
sourceEvent;
+ if (wakeupReaderEvent.getTarget() == SNAPSHOT_READER) {
+ context.sendSplitRequest();
+ } else {
+ if (suspendedBinlogSplit != null) {
+ context.sendSourceEventToCoordinator(
+ new LatestFinishedSplitsSizeRequestEvent());
+ }
+ }
+ } else if (sourceEvent instanceof LatestFinishedSplitsSizeEvent) {
+ if (suspendedBinlogSplit != null) {
+ final int finishedSplitsSize =
+ ((LatestFinishedSplitsSizeEvent)
sourceEvent).getLatestFinishedSplitsSize();
+ final MySqlBinlogSplit binlogSplit =
+ toNormalBinlogSplit(suspendedBinlogSplit,
finishedSplitsSize);
+ suspendedBinlogSplit = null;
+ this.addSplits(Collections.singletonList(binlogSplit));
+ }
+ } else {
+ super.handleSourceEvents(sourceEvent);
+ }
+ }
+
+ private void reportFinishedSnapshotSplitsIfNeed() {
+ if (!finishedUnackedSplits.isEmpty()) {
+ final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
+ for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) {
+ finishedOffsets.put(split.splitId(), split.getHighWatermark());
+ }
+ FinishedSnapshotSplitsReportEvent reportEvent =
+ new FinishedSnapshotSplitsReportEvent(finishedOffsets);
+ context.sendSourceEventToCoordinator(reportEvent);
+ LOG.debug(
+ "The subtask {} reports offsets of finished snapshot
splits {}.",
+ subtaskId,
+ finishedOffsets);
+ }
+ }
+
+ private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit binlogSplit) {
+ final String splitId = binlogSplit.splitId();
+ if (!binlogSplit.isCompletedSplit()) {
+ final int nextMetaGroupId =
+ getNextMetaGroupId(
+ binlogSplit.getFinishedSnapshotSplitInfos().size(),
+ sourceConfig.getSplitMetaGroupSize());
+ BinlogSplitMetaRequestEvent splitMetaRequestEvent =
+ new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId);
+ context.sendSourceEventToCoordinator(splitMetaRequestEvent);
+ } else {
+ LOG.info("The meta of binlog split {} has been collected success",
splitId);
+ this.addSplits(Collections.singletonList(binlogSplit));
+ }
+ }
+
+ private void fillMetaDataForBinlogSplit(BinlogSplitMetaEvent
metadataEvent) {
+ MySqlBinlogSplit binlogSplit =
uncompletedBinlogSplits.get(metadataEvent.getSplitId());
+ if (binlogSplit != null) {
+ final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
+ final int expectedMetaGroupId =
+ getNextMetaGroupId(
+ binlogSplit.getFinishedSnapshotSplitInfos().size(),
+ sourceConfig.getSplitMetaGroupSize());
+ if (receivedMetaGroupId == expectedMetaGroupId) {
+ List<FinishedSnapshotSplitInfo> metaDataGroup =
+ metadataEvent.getMetaGroup().stream()
+ .map(FinishedSnapshotSplitInfo::deserialize)
+ .collect(Collectors.toList());
+ uncompletedBinlogSplits.put(
+ binlogSplit.splitId(),
+ MySqlBinlogSplit.appendFinishedSplitInfos(binlogSplit,
metaDataGroup));
+
+ LOG.info("Fill meta data of group {} to binlog split",
metaDataGroup.size());
+ } else {
+ LOG.warn(
+ "Received out of oder binlog meta event for split {},
the received meta group id is {}, but expected is {}, ignore it",
+ metadataEvent.getSplitId(),
+ receivedMetaGroupId,
+ expectedMetaGroupId);
+ }
+
requestBinlogSplitMetaIfNeeded(uncompletedBinlogSplits.get(binlogSplit.splitId()));
+ } else {
+ LOG.warn(
+ "Received binlog meta event for split {}, but the
uncompleted split map does not contain it",
+ metadataEvent.getSplitId());
+ }
+ }
+
+ private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long
checkpointId) {
+ if (!LOG.isInfoEnabled()) {
+ return;
+ }
+ for (MySqlSplit split : splits) {
+ if (!split.isBinlogSplit()) {
+ return;
+ }
+ BinlogOffset offset = split.asBinlogSplit().getStartingOffset();
+ LOG.info("Binlog offset on checkpoint {}: {}", checkpointId,
offset);
+ }
+ }
+
+ @Override
+ protected MySqlSplit toSplitType(String splitId, MySqlSplitState
splitState) {
+ return splitState.toMySqlSplit();
+ }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index e4c4590fd4..b7c89b9245 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -920,7 +920,7 @@ License : https://github.com/apache/hudi/blob/master/LICENSE
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
-
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/table/MySqlReadableMetadata.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
Source : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the
software have been modified.)
License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE