This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new f4d620881b6 branch-4.1: [fix](streaming-job) Avoid NPE on cross-table
DML during snapshot chunk read #63435 (#63503)
f4d620881b6 is described below
commit f4d620881b67c37f3b22cdf82be24363ef783fd8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 22 22:02:10 2026 +0800
branch-4.1: [fix](streaming-job) Avoid NPE on cross-table DML during
snapshot chunk read #63435 (#63503)
Cherry-picked from #63435
Co-authored-by: wudi <[email protected]>
---
.../external/IncrementalSourceScanFetcher.java | 284 +++++++++++++++++++++
1 file changed, 284 insertions(+)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
new file mode 100644
index 00000000000..a2c51e4e0cf
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.source.reader.external;
+
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.debezium.connector.base.ChangeEventQueue;
+import io.debezium.pipeline.DataChangeEvent;
+import io.debezium.relational.TableId;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent;
+import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isHighWatermarkEvent;
+import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Copy from
https://github.com/apache/flink-cdc/blob/release-3.6.0/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
+ * Fix with FLINK-39633, change to method isChangeRecordInChunkRange
+ */
+public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords,
SourceSplitBase> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IncrementalSourceScanFetcher.class);
+
+ public AtomicBoolean hasNextElement;
+ public AtomicBoolean reachEnd;
+
+ private final FetchTask.Context taskContext;
+ private final ExecutorService executorService;
+ private volatile ChangeEventQueue<DataChangeEvent> queue;
+ private volatile Throwable readException;
+
+ // task to read snapshot for current split
+ private FetchTask<SourceSplitBase> snapshotSplitReadTask;
+ private SnapshotSplit currentSnapshotSplit;
+
+ private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;
+
+ public IncrementalSourceScanFetcher(FetchTask.Context taskContext, int
subtaskId) {
+ this.taskContext = taskContext;
+ ThreadFactory threadFactory =
+ new ThreadFactoryBuilder()
+ .setNameFormat("debezium-snapshot-reader-" + subtaskId)
+ .setUncaughtExceptionHandler(
+ (thread, throwable) ->
setReadException(throwable))
+ .build();
+ this.executorService =
Executors.newSingleThreadExecutor(threadFactory);
+ this.hasNextElement = new AtomicBoolean(false);
+ this.reachEnd = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
+ this.snapshotSplitReadTask = fetchTask;
+ this.currentSnapshotSplit = fetchTask.getSplit().asSnapshotSplit();
+ taskContext.configure(currentSnapshotSplit);
+ this.queue = taskContext.getQueue();
+ this.hasNextElement.set(true);
+ this.reachEnd.set(false);
+
+ executorService.execute(
+ () -> {
+ try {
+ snapshotSplitReadTask.execute(taskContext);
+ } catch (Exception e) {
+ setReadException(e);
+ }
+ });
+ }
+
+ @Override
+ public boolean isFinished() {
+ return currentSnapshotSplit == null
+ || (!snapshotSplitReadTask.isRunning() &&
!hasNextElement.get() && reachEnd.get());
+ }
+
+ @Nullable
+ @Override
+ public Iterator<SourceRecords> pollSplitRecords() throws
InterruptedException {
+ checkReadException();
+
+ if (hasNextElement.get()) {
+ if (taskContext.getSourceConfig().isSkipSnapshotBackfill()) {
+ return pollWithoutBuffer();
+ } else {
+ return pollWithBuffer();
+ }
+ }
+ // the data has been polled, no more data
+ reachEnd.compareAndSet(false, true);
+ return null;
+ }
+
+ public Iterator<SourceRecords> pollWithoutBuffer() throws
InterruptedException {
+ checkReadException();
+ List<DataChangeEvent> batch = queue.poll();
+ final List<SourceRecord> records = new ArrayList<>();
+ for (DataChangeEvent event : batch) {
+ if (isEndWatermarkEvent(event.getRecord())) {
+ hasNextElement.set(false);
+ break;
+ }
+ records.add(event.getRecord());
+ }
+
+ return Collections.singletonList(new
SourceRecords(records)).iterator();
+ }
+
+ public Iterator<SourceRecords> pollWithBuffer() throws
InterruptedException {
+ // eg:
+ // data input: [low watermark event][snapshot events][high watermark
event][change
+ // events][end watermark event]
+ // data output: [low watermark event][normalized events][high
watermark event]
+ boolean reachChangeLogStart = false;
+ boolean reachChangeLogEnd = false;
+ SourceRecord lowWatermark = null;
+ SourceRecord highWatermark = null;
+ Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
+ while (!reachChangeLogEnd) {
+ checkReadException();
+ List<DataChangeEvent> batch = queue.poll();
+ for (DataChangeEvent event : batch) {
+ SourceRecord record = event.getRecord();
+ if (lowWatermark == null) {
+ lowWatermark = record;
+ assertLowWatermark(lowWatermark);
+ continue;
+ }
+
+ if (highWatermark == null && isHighWatermarkEvent(record)) {
+ highWatermark = record;
+ // snapshot events capture end and begin to capture stream
events
+ reachChangeLogStart = true;
+ continue;
+ }
+
+ if (reachChangeLogStart && isEndWatermarkEvent(record)) {
+ // capture to end watermark events, stop the loop
+ reachChangeLogEnd = true;
+ break;
+ }
+
+ if (!reachChangeLogStart) {
+ outputBuffer.put((Struct) record.key(), record);
+ } else {
+ if (isChangeRecordInChunkRange(record)) {
+ // rewrite overlapping snapshot records through the
record key
+ taskContext.rewriteOutputBuffer(outputBuffer, record);
+ }
+ }
+ }
+ }
+ // snapshot split return its data once
+ hasNextElement.set(false);
+
+ final List<SourceRecord> normalizedRecords = new ArrayList<>();
+ normalizedRecords.add(lowWatermark);
+
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
+ normalizedRecords.add(highWatermark);
+
+ final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
+ sourceRecordsSet.add(new SourceRecords(normalizedRecords));
+ return sourceRecordsSet.iterator();
+ }
+
+ private void checkReadException() {
+ if (readException != null) {
+ throw new FlinkRuntimeException(
+ String.format(
+ "Read split %s error due to %s.",
+ currentSnapshotSplit, readException.getMessage()),
+ readException);
+ }
+ }
+
+ private void setReadException(Throwable throwable) {
+ LOG.error(
+ String.format(
+ "Execute snapshot read task for snapshot split %s
fail",
+ currentSnapshotSplit),
+ throwable);
+ if (readException == null) {
+ readException = throwable;
+ } else {
+ readException.addSuppressed(throwable);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (taskContext != null) {
+ taskContext.close();
+ }
+
+ if (snapshotSplitReadTask != null) {
+ snapshotSplitReadTask.close();
+ }
+
+ if (executorService != null) {
+ executorService.shutdown();
+ if (!executorService.awaitTermination(
+ READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+ LOG.warn(
+ "Failed to close the scan fetcher in {} seconds.",
+ READER_CLOSE_TIMEOUT_SECONDS);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Close scan fetcher error", e);
+ }
+ }
+
+ @VisibleForTesting
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ private void assertLowWatermark(SourceRecord lowWatermark) {
+ checkState(
+ isLowWatermarkEvent(lowWatermark),
+ String.format(
+ "The first record should be low watermark signal
event, but actual is %s",
+ lowWatermark));
+ }
+
+ @VisibleForTesting
+ boolean isChangeRecordInChunkRange(SourceRecord record) {
+ if (!taskContext.isDataChangeRecord(record)) {
+ return false;
+ }
+ // Skip records of other captured tables; their schema may not be
loaded yet
+ // and their PKs do not align with this chunk's bounds.
+ TableId recordTableId = taskContext.getTableId(record);
+ if (recordTableId == null ||
!recordTableId.equals(currentSnapshotSplit.getTableId())) {
+ return false;
+ }
+ return taskContext.isRecordBetween(
+ record, currentSnapshotSplit.getSplitStart(),
currentSnapshotSplit.getSplitEnd());
+ }
+
+ @VisibleForTesting
+ void setCurrentSnapshotSplit(SnapshotSplit currentSnapshotSplit) {
+ this.currentSnapshotSplit = currentSnapshotSplit;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]