This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fe33422161 [Improve][CDC] Optimize split state memory allocation in
increment phase (#6554)
fe33422161 is described below
commit fe33422161b4ad7f210b1f3d9eb3de49c4657cc3
Author: hailin0 <[email protected]>
AuthorDate: Tue Apr 2 16:09:18 2024 +0800
[Improve][CDC] Optimize split state memory allocation in increment phase
(#6554)
---
pom.xml | 14 +
.../base/relational/JdbcSourceEventDispatcher.java | 18 +
.../source/enumerator/HybridSplitAssigner.java | 22 ++
.../enumerator/IncrementalSourceEnumerator.java | 12 +
.../enumerator/IncrementalSplitAssigner.java | 23 ++
.../source/enumerator/SnapshotSplitAssigner.java | 28 ++
.../source/event/CompletedSnapshotPhaseEvent.java | 34 ++
.../source/reader/IncrementalSourceReader.java | 15 +-
.../reader/IncrementalSourceRecordEmitter.java | 28 ++
.../external/IncrementalSourceStreamFetcher.java | 15 +-
.../source/split/state/IncrementalSplitState.java | 41 +++
.../cdc/base/utils/SourceRecordUtils.java | 7 +
.../source/enumerator/HybridSplitAssignerTest.java | 132 ++++++++
.../IncrementalSourceStreamFetcherTest.java | 367 +++++++++++++++++++++
.../split/state/IncrementalSplitStateTest.java | 169 ++++++++++
.../LogMinerStreamingChangeEventSource.java | 1 +
.../SqlServerStreamingChangeEventSource.java | 1 +
.../common/source/reader/SourceReaderBase.java | 7 +-
18 files changed, 921 insertions(+), 13 deletions(-)
diff --git a/pom.xml b/pom.xml
index 4d4a3731ae..0f59747f72 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,6 +117,7 @@
<jcommander.version>1.81</jcommander.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.9.0</junit5.version>
+ <mockito.version>4.11.0</mockito.version>
<config.version>1.3.3</config.version>
<maven-shade-plugin.version>3.3.0</maven-shade-plugin.version>
<maven-helper-plugin.version>3.2.0</maven-helper-plugin.version>
@@ -357,6 +358,13 @@
<version>${junit4.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
@@ -521,6 +529,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
index 23dfabd9fa..90cc8126f4 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
@@ -25,7 +25,9 @@ import
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.Watermar
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.config.CommonConnectorConfig;
+import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
+import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
@@ -37,6 +39,8 @@ import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.Map;
/**
@@ -71,6 +75,10 @@ public class JdbcSourceEventDispatcher extends
EventDispatcher<TableId> {
filter,
changeEventCreator,
metadataProvider,
+ Heartbeat.create(
+ getHeartbeatInterval(connectorConfig),
+ topicSelector.getHeartbeatTopic(),
+ connectorConfig.getLogicalName()),
schemaNameAdjuster);
this.queue = queue;
this.topic = topicSelector.getPrimaryTopic();
@@ -92,4 +100,14 @@ public class JdbcSourceEventDispatcher extends
EventDispatcher<TableId> {
sourcePartition, topic, sourceSplit.splitId(),
watermarkKind, watermark);
queue.enqueue(new DataChangeEvent(sourceRecord));
}
+
+ private static Duration getHeartbeatInterval(CommonConnectorConfig
connectorConfig) {
+ Configuration configuration = connectorConfig.getConfig();
+ Duration heartbeatInterval =
+ configuration.getDuration(Heartbeat.HEARTBEAT_INTERVAL,
ChronoUnit.MILLIS);
+ if (heartbeatInterval.isZero()) {
+ return Duration.ofMillis(5000);
+ }
+ return heartbeatInterval;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
index 9070e2fb88..d6b0bdb96c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
@@ -31,9 +33,11 @@ import org.slf4j.LoggerFactory;
import io.debezium.relational.TableId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
+import java.util.function.Predicate;
/** Assigner for Hybrid split which contains snapshot splits and incremental
splits. */
public class HybridSplitAssigner<C extends SourceConfig> implements
SplitAssigner {
@@ -146,4 +150,22 @@ public class HybridSplitAssigner<C extends SourceConfig>
implements SplitAssigne
snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
incrementalSplitAssigner.notifyCheckpointComplete(checkpointId);
}
+
+ @VisibleForTesting
+ IncrementalSplitAssigner<C> getIncrementalSplitAssigner() {
+ return incrementalSplitAssigner;
+ }
+
+ @VisibleForTesting
+ SnapshotSplitAssigner<C> getSnapshotSplitAssigner() {
+ return snapshotSplitAssigner;
+ }
+
+ public boolean completedSnapshotPhase(List<TableId> tableIds) {
+ return Arrays.asList(
+ snapshotSplitAssigner.completedSnapshotPhase(tableIds),
+
incrementalSplitAssigner.completedSnapshotPhase(tableIds))
+ .stream()
+ .allMatch(Predicate.isEqual(true));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
index 86f7ac42de..b17b910e5d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.cdc.base.source.enumerator;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
+import
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsAckEvent;
import
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import
org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
@@ -120,6 +121,17 @@ public class IncrementalSourceEnumerator
.map(SnapshotSplitWatermark::getSplitId)
.collect(Collectors.toList()));
context.sendEventToSourceReader(subtaskId, ackEvent);
+ } else if (sourceEvent instanceof CompletedSnapshotPhaseEvent) {
+ LOG.debug(
+ "The enumerator receives completed snapshot phase event {}
from subtask {}.",
+ sourceEvent,
+ subtaskId);
+ CompletedSnapshotPhaseEvent event = (CompletedSnapshotPhaseEvent)
sourceEvent;
+ if (splitAssigner instanceof HybridSplitAssigner) {
+ ((HybridSplitAssigner)
splitAssigner).completedSnapshotPhase(event.getTableIds());
+ LOG.info(
+ "Clean the
SnapshotSplitAssigner#assignedSplits/splitCompletedOffsets to empty.");
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
index fe8204f6cd..7b45ee1ef6 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
@@ -45,6 +47,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
+
/** Assigner for incremental split. */
public class IncrementalSplitAssigner<C extends SourceConfig> implements
SplitAssigner {
@@ -255,4 +259,23 @@ public class IncrementalSplitAssigner<C extends
SourceConfig> implements SplitAs
completedSnapshotSplitInfos,
checkpointDataType);
}
+
+ @VisibleForTesting
+ void setSplitAssigned(boolean assigned) {
+ this.splitAssigned = assigned;
+ }
+
+ public boolean completedSnapshotPhase(List<TableId> tableIds) {
+ checkArgument(splitAssigned && noMoreSplits());
+
+ for (String splitKey : new
ArrayList<>(context.getAssignedSnapshotSplit().keySet())) {
+ SnapshotSplit assignedSplit =
context.getAssignedSnapshotSplit().get(splitKey);
+ if (tableIds.contains(assignedSplit.getTableId())) {
+ context.getAssignedSnapshotSplit().remove(splitKey);
+
context.getSplitCompletedOffsets().remove(assignedSplit.splitId());
+ }
+ }
+ return context.getAssignedSnapshotSplit().isEmpty()
+ && context.getSplitCompletedOffsets().isEmpty();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
index 443343947c..c16dd81102 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
@@ -45,6 +47,8 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
+
/** Assigner for snapshot split. */
public class SnapshotSplitAssigner<C extends SourceConfig> implements
SplitAssigner {
private static final Logger LOG =
LoggerFactory.getLogger(SnapshotSplitAssigner.class);
@@ -278,4 +282,28 @@ public class SnapshotSplitAssigner<C extends SourceConfig>
implements SplitAssig
private boolean allSplitsCompleted() {
return noMoreSplits() && assignedSplits.size() ==
splitCompletedOffsets.size();
}
+
+ @VisibleForTesting
+ Map<String, SnapshotSplit> getAssignedSplits() {
+ return assignedSplits;
+ }
+
+ @VisibleForTesting
+ Map<String, SnapshotSplitWatermark> getSplitCompletedOffsets() {
+ return splitCompletedOffsets;
+ }
+
+ public boolean completedSnapshotPhase(List<TableId> tableIds) {
+ checkArgument(isCompleted() && allSplitsCompleted());
+
+ for (String splitKey : new ArrayList<>(assignedSplits.keySet())) {
+ SnapshotSplit assignedSplit = assignedSplits.get(splitKey);
+ if (tableIds.contains(assignedSplit.getTableId())) {
+ assignedSplits.remove(splitKey);
+ splitCompletedOffsets.remove(assignedSplit.splitId());
+ }
+ }
+
+ return assignedSplits.isEmpty() && splitCompletedOffsets.isEmpty();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotPhaseEvent.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotPhaseEvent.java
new file mode 100644
index 0000000000..623bf1c1a9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotPhaseEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.event;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+
+import io.debezium.relational.TableId;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class CompletedSnapshotPhaseEvent implements SourceEvent {
+ private static final long serialVersionUID = 1L;
+
+ private List<TableId> tableIds;
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 7f9d890197..829f68763d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import
org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
@@ -207,7 +208,19 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
debeziumDeserializationSchema.restoreCheckpointProducedType(
incrementalSplit.getCheckpointDataType());
}
- return new IncrementalSplitState(split.asIncrementalSplit());
+ IncrementalSplitState splitState = new
IncrementalSplitState(incrementalSplit);
+ if (splitState.autoEnterPureIncrementPhaseIfAllowed()) {
+ log.info(
+ "The incremental split[{}] startup position {} is
equal the maxSnapshotSplitsHighWatermark {}, auto enter pure increment phase.",
+ incrementalSplit.splitId(),
+ splitState.getStartupOffset(),
+ splitState.getMaxSnapshotSplitsHighWatermark());
+ log.info("Clean the
IncrementalSplit#completedSnapshotSplitInfos to empty.");
+ CompletedSnapshotPhaseEvent event =
+ new
CompletedSnapshotPhaseEvent(splitState.getTableIds());
+ context.sendSourceEventToEnumerator(event);
+ }
+ return splitState;
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index 65474a0d9f..a98a9d0959 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -22,9 +22,11 @@ import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
+import
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
+import
org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
import
org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
@@ -65,6 +67,7 @@ public class IncrementalSourceRecordEmitter<T>
protected final OffsetFactory offsetFactory;
+ protected final SourceReader.Context context;
protected final Counter recordFetchDelay;
protected final Counter recordEmitDelay;
protected final EventListener eventListener;
@@ -76,6 +79,7 @@ public class IncrementalSourceRecordEmitter<T>
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.outputCollector = new OutputCollector<>();
this.offsetFactory = offsetFactory;
+ this.context = context;
this.recordFetchDelay =
context.getMetricsContext().counter(CDC_RECORD_FETCH_DELAY);
this.recordEmitDelay =
context.getMetricsContext().counter(CDC_RECORD_EMIT_DELAY);
this.eventListener = context.getEventListener();
@@ -90,6 +94,7 @@ public class IncrementalSourceRecordEmitter<T>
SourceRecord next = elementIterator.next();
reportMetrics(next);
processElement(next, collector, splitState);
+ markEnterPureIncrementPhase(next, splitState);
}
}
@@ -138,6 +143,29 @@ public class IncrementalSourceRecordEmitter<T>
}
}
+ private void markEnterPureIncrementPhase(
+ SourceRecord element, SourceSplitStateBase splitState) {
+ if (splitState.isIncrementalSplitState()) {
+ IncrementalSplitState incrementalSplitState =
splitState.asIncrementalSplitState();
+ if (incrementalSplitState.isEnterPureIncrementPhase()) {
+ return;
+ }
+ Offset position = getOffsetPosition(element);
+ if
(incrementalSplitState.markEnterPureIncrementPhaseIfNeed(position)) {
+ log.info(
+ "The current record position {} is after the
maxSnapshotSplitsHighWatermark {}, "
+ + "mark enter pure increment phase.",
+ position,
+
incrementalSplitState.getMaxSnapshotSplitsHighWatermark());
+ log.info("Clean the
IncrementalSplit#completedSnapshotSplitInfos to empty.");
+
+ CompletedSnapshotPhaseEvent completedSnapshotPhaseEvent =
+ new
CompletedSnapshotPhaseEvent(incrementalSplitState.getTableIds());
+
context.sendSourceEventToEnumerator(completedSnapshotPhaseEvent);
+ }
+ }
+ }
+
private Offset getWatermark(SourceRecord watermarkEvent) {
return getOffsetPosition(watermarkEvent.sourceOffset());
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index e34970054b..4cad739ac6 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -170,7 +170,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
* <p>After event batch: [checkpoint-before] [SchemaChangeEvent-1,
SchemaChangeEvent-2,
* checkpoint-after] [a, b, c, d, e]
*/
- private Iterator<SourceRecords>
splitSchemaChangeStream(List<DataChangeEvent> batchEvents) {
+ Iterator<SourceRecords> splitSchemaChangeStream(List<DataChangeEvent>
batchEvents) {
List<SourceRecords> sourceRecordsSet = new ArrayList<>();
List<SourceRecord> sourceRecordList = new ArrayList<>();
@@ -181,11 +181,6 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
if (!shouldEmit(currentRecord)) {
continue;
}
- if (!SourceRecordUtils.isDataChangeRecord(currentRecord)
- && !SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
- sourceRecordList.add(currentRecord);
- continue;
- }
if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
if (!schemaChangeResolver.support(currentRecord)) {
@@ -208,9 +203,11 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
sourceRecordList = new ArrayList<>();
sourceRecordList.add(currentRecord);
}
- } else if (SourceRecordUtils.isDataChangeRecord(currentRecord)) {
+ } else if (SourceRecordUtils.isDataChangeRecord(currentRecord)
+ || SourceRecordUtils.isHeartbeatRecord(currentRecord)) {
if (previousRecord == null
- ||
SourceRecordUtils.isDataChangeRecord(previousRecord)) {
+ || SourceRecordUtils.isDataChangeRecord(previousRecord)
+ ||
SourceRecordUtils.isHeartbeatRecord(previousRecord)) {
sourceRecordList.add(currentRecord);
} else {
sourceRecordList.add(
@@ -274,7 +271,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
}
/** Returns the record should emit or not. */
- private boolean shouldEmit(SourceRecord sourceRecord) {
+ boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
Offset position = taskContext.getStreamOffset(sourceRecord);
TableId tableId = getTableId(sourceRecord);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
index 4157569766..c04026bf1e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
@@ -24,6 +24,7 @@ import io.debezium.relational.TableId;
import lombok.Getter;
import lombok.Setter;
+import java.util.Comparator;
import java.util.List;
/** The state of split to describe the change log of table(s). */
@@ -39,11 +40,27 @@ public class IncrementalSplitState extends
SourceSplitStateBase {
/** Obtained by configuration, may not end */
private Offset stopOffset;
+ private Offset maxSnapshotSplitsHighWatermark;
+ private volatile boolean enterPureIncrementPhase;
+
public IncrementalSplitState(IncrementalSplit split) {
super(split);
this.tableIds = split.getTableIds();
this.startupOffset = split.getStartupOffset();
this.stopOffset = split.getStopOffset();
+
+ if (split.getCompletedSnapshotSplitInfos().isEmpty()) {
+ this.maxSnapshotSplitsHighWatermark = null;
+ this.enterPureIncrementPhase = true;
+ } else {
+ this.maxSnapshotSplitsHighWatermark =
+ split.getCompletedSnapshotSplitInfos().stream()
+ .filter(e -> e.getWatermark() != null)
+ .max(Comparator.comparing(o ->
o.getWatermark().getHighWatermark()))
+ .map(e -> e.getWatermark().getHighWatermark())
+ .get();
+ this.enterPureIncrementPhase = false;
+ }
}
@Override
@@ -56,4 +73,28 @@ public class IncrementalSplitState extends
SourceSplitStateBase {
getStopOffset(),
incrementalSplit.getCompletedSnapshotSplitInfos());
}
+
+ public synchronized boolean markEnterPureIncrementPhaseIfNeed(Offset
currentRecordPosition) {
+ if (enterPureIncrementPhase) {
+ return false;
+ }
+
+ if (currentRecordPosition.isAtOrAfter(maxSnapshotSplitsHighWatermark))
{
+
split.asIncrementalSplit().getCompletedSnapshotSplitInfos().clear();
+ this.enterPureIncrementPhase = true;
+ return true;
+ }
+
+ return false;
+ }
+
+ public synchronized boolean autoEnterPureIncrementPhaseIfAllowed() {
+ if (!enterPureIncrementPhase
+ && maxSnapshotSplitsHighWatermark.compareTo(startupOffset) ==
0) {
+
split.asIncrementalSplit().getCompletedSnapshotSplitInfos().clear();
+ enterPureIncrementPhase = true;
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
index abbdb5b76a..e06213b06d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
@@ -47,6 +47,8 @@ public class SourceRecordUtils {
public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
"io.debezium.connector.mysql.SchemaChangeKey";
+ public static final String HEARTBEAT_VALUE_SCHEMA_KEY_NAME =
+ "io.debezium.connector.common.Heartbeat";
private static final DocumentReader DOCUMENT_READER =
DocumentReader.defaultReader();
/** Converts a {@link ResultSet} row to an array of Objects. */
@@ -105,6 +107,11 @@ public class SourceRecordUtils {
&& value.getString(Envelope.FieldName.OPERATION) != null;
}
+ public static boolean isHeartbeatRecord(SourceRecord record) {
+ Schema valueSchema = record.valueSchema();
+ return valueSchema != null &&
valueSchema.name().equals(HEARTBEAT_VALUE_SCHEMA_KEY_NAME);
+ }
+
public static TableId getTableId(SourceRecord dataRecord) {
Struct value = (Struct) dataRecord.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssignerTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssignerTest.java
new file mode 100644
index 0000000000..2c931eb9e4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssignerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
+
+import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
+import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
+import
org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HybridSplitAssignerTest {
+ @Test
+ public void testCompletedSnapshotPhase() {
+ Map<String, SnapshotSplit> assignedSplits = createAssignedSplits();
+ Map<String, SnapshotSplitWatermark> splitCompletedOffsets =
createSplitCompletedOffsets();
+ SnapshotPhaseState snapshotPhaseState =
+ new SnapshotPhaseState(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ assignedSplits,
+ splitCompletedOffsets,
+ true,
+ Collections.emptyList(),
+ false,
+ false);
+ HybridPendingSplitsState checkpointState =
+ new HybridPendingSplitsState(snapshotPhaseState, null);
+ SplitAssigner.Context context =
+ new SplitAssigner.Context<>(
+ null,
+ Collections.emptySet(),
+
checkpointState.getSnapshotPhaseState().getAssignedSplits(),
+
checkpointState.getSnapshotPhaseState().getSplitCompletedOffsets());
+ HybridSplitAssigner splitAssigner =
+ new HybridSplitAssigner<>(context, 1, 1, checkpointState,
null, null);
+ splitAssigner.getIncrementalSplitAssigner().setSplitAssigned(true);
+
+ Assertions.assertFalse(
+
splitAssigner.completedSnapshotPhase(Arrays.asList(TableId.parse("db1.table1"))));
+ Assertions.assertFalse(
+
splitAssigner.getSnapshotSplitAssigner().getAssignedSplits().isEmpty());
+ Assertions.assertFalse(
+
splitAssigner.getSnapshotSplitAssigner().getSplitCompletedOffsets().isEmpty());
+ Assertions.assertFalse(context.getAssignedSnapshotSplit().isEmpty());
+ Assertions.assertFalse(context.getSplitCompletedOffsets().isEmpty());
+
+ Assertions.assertTrue(
+
splitAssigner.completedSnapshotPhase(Arrays.asList(TableId.parse("db1.table2"))));
+ Assertions.assertTrue(
+
splitAssigner.getSnapshotSplitAssigner().getAssignedSplits().isEmpty());
+ Assertions.assertTrue(
+
splitAssigner.getSnapshotSplitAssigner().getSplitCompletedOffsets().isEmpty());
+ Assertions.assertTrue(context.getAssignedSnapshotSplit().isEmpty());
+ Assertions.assertTrue(context.getSplitCompletedOffsets().isEmpty());
+ }
+
+ private static Map<String, SnapshotSplit> createAssignedSplits() {
+ return Stream.of(
+ new AbstractMap.SimpleEntry<>(
+ "db1.table1.1",
+ new SnapshotSplit(
+ "db1.table1.1",
+ TableId.parse("db1.table1"),
+ null,
+ null,
+ null)),
+ new AbstractMap.SimpleEntry<>(
+ "db1.table1.2",
+ new SnapshotSplit(
+ "db1.table1.2",
+ TableId.parse("db1.table1"),
+ null,
+ null,
+ null)),
+ new AbstractMap.SimpleEntry<>(
+ "db1.table2.1",
+ new SnapshotSplit(
+ "db1.table2.1",
+ TableId.parse("db1.table2"),
+ null,
+ null,
+ null)),
+ new AbstractMap.SimpleEntry<>(
+ "db1.table2.2",
+ new SnapshotSplit(
+ "db1.table2.2",
+ TableId.parse("db1.table2"),
+ null,
+ null,
+ null)))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+
+ private static Map<String, SnapshotSplitWatermark>
createSplitCompletedOffsets() {
+ return Stream.of(
+ new AbstractMap.SimpleEntry<>(
+ "db1.table1.1", new
SnapshotSplitWatermark(null, null, null)),
+ new AbstractMap.SimpleEntry<>(
+ "db1.table1.2", new
SnapshotSplitWatermark(null, null, null)),
+ new AbstractMap.SimpleEntry<>(
+ "db1.table2.1", new
SnapshotSplitWatermark(null, null, null)),
+ new AbstractMap.SimpleEntry<>(
+ "db1.table2.2", new
SnapshotSplitWatermark(null, null, null)))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
new file mode 100644
index 0000000000..64ac4f4a0c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.reader.external;
+
+import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
+import
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
+import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.data.Envelope;
+import io.debezium.heartbeat.Heartbeat;
+import io.debezium.pipeline.DataChangeEvent;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class IncrementalSourceStreamFetcherTest {
+
+ @Test
+ public void testSplitSchemaChangeStream() throws Exception {
+ IncrementalSourceStreamFetcher fetcher = createFetcher();
+
+ List<DataChangeEvent> inputEvents = new ArrayList<>();
+ List<SourceRecords> records = new ArrayList<>();
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ Iterator<SourceRecords> outputEvents =
fetcher.splitSchemaChangeStream(inputEvents);
+ outputEvents.forEachRemaining(records::add);
+
+ Assertions.assertEquals(1, records.size());
+ Assertions.assertEquals(2,
records.get(0).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(1)));
+
+ inputEvents = new ArrayList<>();
+ records = new ArrayList<>();
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+ outputEvents.forEachRemaining(records::add);
+
+ Assertions.assertEquals(2, records.size());
+ Assertions.assertEquals(1,
records.get(0).getSourceRecordList().size());
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(0).getSourceRecordList().get(0)));
+ Assertions.assertEquals(3,
records.get(1).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(1).getSourceRecordList().get(2)));
+
+ inputEvents = new ArrayList<>();
+ records = new ArrayList<>();
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+ outputEvents.forEachRemaining(records::add);
+
+ Assertions.assertEquals(2, records.size());
+ Assertions.assertEquals(3,
records.get(0).getSourceRecordList().size());
+ Assertions.assertEquals(3,
records.get(1).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(0).getSourceRecordList().get(2)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(1).getSourceRecordList().get(2)));
+
+ inputEvents = new ArrayList<>();
+ records = new ArrayList<>();
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+ outputEvents.forEachRemaining(records::add);
+
+ Assertions.assertEquals(3, records.size());
+ Assertions.assertEquals(1,
records.get(0).getSourceRecordList().size());
+ Assertions.assertEquals(3,
records.get(1).getSourceRecordList().size());
+ Assertions.assertEquals(2,
records.get(2).getSourceRecordList().size());
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(0).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(1).getSourceRecordList().get(2)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(2).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(2).getSourceRecordList().get(1)));
+
+ inputEvents = new ArrayList<>();
+ records = new ArrayList<>();
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+ outputEvents.forEachRemaining(records::add);
+
+ Assertions.assertEquals(3, records.size());
+ Assertions.assertEquals(2,
records.get(0).getSourceRecordList().size());
+ Assertions.assertEquals(3,
records.get(1).getSourceRecordList().size());
+ Assertions.assertEquals(1,
records.get(2).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(0).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(1).getSourceRecordList().get(2)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(2).getSourceRecordList().get(0)));
+
+ inputEvents = new ArrayList<>();
+ records = new ArrayList<>();
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+ outputEvents.forEachRemaining(records::add);
+
+ Assertions.assertEquals(4, records.size());
+ Assertions.assertEquals(2,
records.get(0).getSourceRecordList().size());
+ Assertions.assertEquals(2,
records.get(1).getSourceRecordList().size());
+ Assertions.assertEquals(2,
records.get(2).getSourceRecordList().size());
+ Assertions.assertEquals(2,
records.get(3).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(0).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(1).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(2).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(2).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(3).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(3).getSourceRecordList().get(1)));
+
+ inputEvents = new ArrayList<>();
+ records = new ArrayList<>();
+ inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+ outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+ outputEvents.forEachRemaining(records::add);
+
+ Assertions.assertEquals(11, records.size());
+ Assertions.assertEquals(3,
records.get(0).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isHeartbeatRecord(records.get(0).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(0).getSourceRecordList().get(2)));
+ Assertions.assertEquals(2,
records.get(1).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(1).getSourceRecordList().get(1)));
+ Assertions.assertEquals(2,
records.get(2).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isHeartbeatRecord(records.get(2).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(2).getSourceRecordList().get(1)));
+ Assertions.assertEquals(2,
records.get(3).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(3).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(3).getSourceRecordList().get(1)));
+ Assertions.assertEquals(3,
records.get(4).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(4).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(4).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(4).getSourceRecordList().get(2)));
+ Assertions.assertEquals(2,
records.get(5).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(5).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(5).getSourceRecordList().get(1)));
+ Assertions.assertEquals(4,
records.get(6).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isHeartbeatRecord(records.get(6).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(6).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isHeartbeatRecord(records.get(6).getSourceRecordList().get(2)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(6).getSourceRecordList().get(3)));
+ Assertions.assertEquals(3,
records.get(7).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(7).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(7).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(7).getSourceRecordList().get(2)));
+ Assertions.assertEquals(3,
records.get(8).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isHeartbeatRecord(records.get(8).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(8).getSourceRecordList().get(1)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+ records.get(8).getSourceRecordList().get(2)));
+ Assertions.assertEquals(2,
records.get(9).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isSchemaChangeEvent(records.get(9).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+ WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+ records.get(9).getSourceRecordList().get(1)));
+ Assertions.assertEquals(2,
records.get(10).getSourceRecordList().size());
+ Assertions.assertTrue(
+
SourceRecordUtils.isDataChangeRecord(records.get(10).getSourceRecordList().get(0)));
+ Assertions.assertTrue(
+
SourceRecordUtils.isHeartbeatRecord(records.get(10).getSourceRecordList().get(1)));
+ }
+
+ static SourceRecord createSchemaChangeEvent() {
+ Schema keySchema =
+
SchemaBuilder.struct().name(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME).build();
+ SourceRecord record =
+ new SourceRecord(
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ null,
+ keySchema,
+ null,
+ null,
+ null);
+ Assertions.assertTrue(SourceRecordUtils.isSchemaChangeEvent(record));
+ return record;
+ }
+
+ static SourceRecord createDataEvent() {
+ Schema valueSchema =
+ SchemaBuilder.struct()
+ .field(Envelope.FieldName.OPERATION,
Schema.STRING_SCHEMA)
+ .build();
+ Struct value = new Struct(valueSchema);
+ value.put(valueSchema.field(Envelope.FieldName.OPERATION), "c");
+ SourceRecord record =
+ new SourceRecord(
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ null,
+ null,
+ null,
+ valueSchema,
+ value);
+ Assertions.assertTrue(SourceRecordUtils.isDataChangeRecord(record));
+ return record;
+ }
+
+ static SourceRecord createHeartbeatEvent() throws InterruptedException {
+ Heartbeat heartbeat = Heartbeat.create(Duration.ofNanos(1), "test",
"test");
+ AtomicReference<SourceRecord> eventRef = new AtomicReference<>();
+ heartbeat.forcedBeat(
+ Collections.singletonMap("heartbeat", "heartbeat"),
+ Collections.singletonMap("heartbeat", "heartbeat"),
+ sourceRecord -> eventRef.set(sourceRecord));
+ return eventRef.get();
+ }
+
+ static IncrementalSourceStreamFetcher createFetcher() {
+ SchemaChangeResolver schemaChangeResolver =
mock(SchemaChangeResolver.class);
+ when(schemaChangeResolver.support(any())).thenReturn(true);
+ IncrementalSourceStreamFetcher fetcher =
+ new IncrementalSourceStreamFetcher(null, 0,
schemaChangeResolver);
+ IncrementalSourceStreamFetcher spy = spy(fetcher);
+ doReturn(true).when(spy).shouldEmit(any());
+ return spy;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitStateTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitStateTest.java
new file mode 100644
index 0000000000..4a0b40852a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitStateTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.split.state;
+
+import
org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
+import
org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
+import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+import lombok.AllArgsConstructor;
+import lombok.ToString;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class IncrementalSplitStateTest {
+
+ @Test
+ public void testMarkEnterPureIncrementPhaseIfNeed() {
+ Offset startupOffset = new TestOffset(100);
+ List<CompletedSnapshotSplitInfo> snapshotSplits =
Collections.emptyList();
+ IncrementalSplit split = createIncrementalSplit(startupOffset,
snapshotSplits);
+ IncrementalSplitState splitState = new IncrementalSplitState(split);
+ Assertions.assertNull(splitState.getMaxSnapshotSplitsHighWatermark());
+ Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(null));
+
+ startupOffset = new TestOffset(100);
+ snapshotSplits =
+ Stream.of(
+ createCompletedSnapshotSplitInfo(
+ "test1", new TestOffset(100), new
TestOffset(100)),
+ createCompletedSnapshotSplitInfo(
+ "test2", new TestOffset(100), new
TestOffset(100)))
+ .collect(Collectors.toList());
+ split = createIncrementalSplit(startupOffset, snapshotSplits);
+ splitState = new IncrementalSplitState(split);
+ Assertions.assertEquals(startupOffset,
splitState.getMaxSnapshotSplitsHighWatermark());
+ Assertions.assertFalse(splitState.isEnterPureIncrementPhase());
+
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new
TestOffset(99)));
+ Assertions.assertFalse(splitState.isEnterPureIncrementPhase());
+ Assertions.assertFalse(snapshotSplits.isEmpty());
+ Assertions.assertTrue(splitState.markEnterPureIncrementPhaseIfNeed(new
TestOffset(100)));
+ Assertions.assertTrue(snapshotSplits.isEmpty());
+
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new
TestOffset(100)));
+
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new
TestOffset(101)));
+
+ startupOffset = new TestOffset(100);
+ snapshotSplits =
+ Stream.of(
+ createCompletedSnapshotSplitInfo(
+ "test1", new TestOffset(1), new
TestOffset(50)),
+ createCompletedSnapshotSplitInfo(
+ "test2", new TestOffset(50), new
TestOffset(200)))
+ .collect(Collectors.toList());
+ split = createIncrementalSplit(startupOffset, snapshotSplits);
+ splitState = new IncrementalSplitState(split);
+ Assertions.assertEquals(
+ new TestOffset(200),
splitState.getMaxSnapshotSplitsHighWatermark());
+ Assertions.assertFalse(splitState.isEnterPureIncrementPhase());
+ Assertions.assertTrue(splitState.markEnterPureIncrementPhaseIfNeed(new
TestOffset(201)));
+ Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+ Assertions.assertTrue(snapshotSplits.isEmpty());
+
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new
TestOffset(200)));
+ Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new
TestOffset(201)));
+
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new
TestOffset(202)));
+ }
+
+ @Test
+ public void testAutoEnterPureIncrementPhaseIfAllowed() {
+ Offset startupOffset = new TestOffset(100);
+ List<CompletedSnapshotSplitInfo> snapshotSplits =
Collections.emptyList();
+ IncrementalSplit split = createIncrementalSplit(startupOffset,
snapshotSplits);
+ IncrementalSplitState splitState = new IncrementalSplitState(split);
+ Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+
Assertions.assertFalse(splitState.autoEnterPureIncrementPhaseIfAllowed());
+
+ startupOffset = new TestOffset(100);
+ snapshotSplits =
+ Stream.of(
+ createCompletedSnapshotSplitInfo(
+ "test1", new TestOffset(100), new
TestOffset(100)),
+ createCompletedSnapshotSplitInfo(
+ "test2", new TestOffset(100), new
TestOffset(100)))
+ .collect(Collectors.toList());
+ split = createIncrementalSplit(startupOffset, snapshotSplits);
+ splitState = new IncrementalSplitState(split);
+
+ Assertions.assertFalse(splitState.isEnterPureIncrementPhase());
+
Assertions.assertTrue(splitState.autoEnterPureIncrementPhaseIfAllowed());
+ Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+
Assertions.assertFalse(splitState.autoEnterPureIncrementPhaseIfAllowed());
+ Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+
+ startupOffset = new TestOffset(100);
+ snapshotSplits =
+ Stream.of(
+ createCompletedSnapshotSplitInfo(
+ "test1", new TestOffset(100), new
TestOffset(100)),
+ createCompletedSnapshotSplitInfo(
+ "test2", new TestOffset(100), new
TestOffset(101)))
+ .collect(Collectors.toList());
+ split = createIncrementalSplit(startupOffset, snapshotSplits);
+ splitState = new IncrementalSplitState(split);
+ Assertions.assertFalse(splitState.isEnterPureIncrementPhase());
+
Assertions.assertFalse(splitState.autoEnterPureIncrementPhaseIfAllowed());
+ }
+
+ private static IncrementalSplit createIncrementalSplit(
+ Offset startupOffset, List<CompletedSnapshotSplitInfo>
snapshotSplits) {
+ return new IncrementalSplit(
+ "test",
+ Arrays.asList(new TableId("db", "schema", "table")),
+ startupOffset,
+ null,
+ snapshotSplits,
+ null);
+ }
+
+ private static CompletedSnapshotSplitInfo createCompletedSnapshotSplitInfo(
+ String splitId, Offset lowWatermark, Offset highWatermark) {
+ return new CompletedSnapshotSplitInfo(
+ splitId,
+ new TableId("db", "schema", "table"),
+ null,
+ null,
+ null,
+ new SnapshotSplitWatermark(null, lowWatermark, highWatermark));
+ }
+
+ @ToString
+ @AllArgsConstructor
+ static class TestOffset extends Offset {
+ private int offset;
+
+ @Override
+ public int compareTo(Offset o) {
+ return Integer.compare(offset, ((TestOffset) o).offset);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof TestOffset && offset == ((TestOffset)
o).offset;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
index 4ce4d17a02..0e9e936865 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
@@ -241,6 +241,7 @@ public class LogMinerStreamingChangeEventSource
// log before proceeding.
if (archiveLogOnlyMode && startScn.equals(endScn))
{
pauseBetweenMiningSessions();
+
dispatcher.dispatchHeartbeatEvent(offsetContext);
continue;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java
index 3053598c66..05eba0d96f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java
@@ -181,6 +181,7 @@ public class SqlServerStreamingChangeEventSource
&& shouldIncreaseFromLsn) {
LOGGER.debug("No change in the database");
metronome.pause();
+ dispatcher.dispatchHeartbeatEvent(offsetContext);
continue;
}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
index 668ed78dca..29dd2ff6f5 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -62,7 +63,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
protected final SourceReader.Context context;
private RecordsWithSplitIds<E> currentFetch;
- private SplitContext<T, SplitStateT> currentSplitContext;
+ protected SplitContext<T, SplitStateT> currentSplitContext;
private Collector<T> currentSplitOutput;
private boolean noMoreSplitsAssignment;
@@ -234,9 +235,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
protected abstract SplitT toSplitType(String splitId, SplitStateT
splitState);
@RequiredArgsConstructor
- private static final class SplitContext<T, SplitStateT> {
+ protected static final class SplitContext<T, SplitStateT> {
final String splitId;
- final SplitStateT state;
+ @Getter final SplitStateT state;
Collector<T> splitOutput;
Collector<T> getOrCreateSplitOutput(Collector<T> output) {