This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0df63e26e [hotfix][cdc-runtime] Keep upstream pending requests in 
order to avoid checkpoint hanging & state inconsistency in timestamp startup 
mode
0df63e26e is described below

commit 0df63e26e84e7443576a0887cd27613bcf73e830
Author: yuxiqian <[email protected]>
AuthorDate: Mon Aug 26 23:51:28 2024 +0800

    [hotfix][cdc-runtime] Keep upstream pending requests in order to avoid 
checkpoint hanging & state inconsistency in timestamp startup mode
    
    This closes  #3576.
---
 .../flink/cdc/pipeline/tests/MysqlE2eITCase.java   |  58 +++++++
 .../runtime/operators/schema/SchemaOperator.java   |  19 ++-
 .../coordinator/SchemaRegistryRequestHandler.java  | 175 +++++++++++++--------
 .../schema/event/SchemaChangeRequest.java          |  15 +-
 .../runtime/partitioning/PrePartitionOperator.java |   7 +
 .../operators/EventOperatorTestHarness.java        |   2 +-
 6 files changed, 203 insertions(+), 73 deletions(-)

diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 29fe5db9d..ccdbbbad7 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -332,6 +332,64 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
                 "DropTableEvent{tableId=%s.products}");
     }
 
+    @Test
+    public void testDroppingTable() throws Exception {
+        Thread.sleep(5000);
+        LOG.info("Sleep 5 seconds to distinguish initial DDL events with 
dropping table events...");
+        long ddlTimestamp = System.currentTimeMillis();
+        Thread.sleep(5000);
+        LOG.info("Going to drop tables after timestamp {}", ddlTimestamp);
+
+        try (Connection connection = 
mysqlInventoryDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("DROP TABLE products;");
+        }
+
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: 3306\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.\\.*\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "  scan.startup.mode: timestamp\n"
+                                + "  scan.startup.timestamp-millis: %d\n"
+                                + "  scan.binlog.newly-added-table.enabled: 
true\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: %d\n"
+                                + "  schema.change.behavior: evolve",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        ddlTimestamp,
+                        parallelism);
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+        submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+        waitUntilSpecificEvent(
+                String.format(
+                        "Table %s.products received SchemaChangeEvent 
DropTableEvent{tableId=%s.products} and start to be blocked.",
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        mysqlInventoryDatabase.getDatabaseName()));
+
+        waitUntilSpecificEvent(
+                String.format(
+                        "Schema change event 
DropTableEvent{tableId=%s.products} has been handled in another subTask 
already.",
+                        mysqlInventoryDatabase.getDatabaseName()));
+    }
+
     private void validateResult(String... expectedEvents) throws Exception {
         String dbName = mysqlInventoryDatabase.getDatabaseName();
         for (String event : expectedEvents) {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index 1b4f50e89..58f524a4b 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.StringData;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.DropTableEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.FlushEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
@@ -50,6 +51,7 @@ import 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -242,7 +244,13 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
                 tableId,
                 event);
         handleSchemaChangeEvent(tableId, event);
-        // Update caches
+
+        if (event instanceof DropTableEvent) {
+            // Update caches unless event is a Drop table event. In that case, 
no schema will be
+            // available / necessary.
+            return;
+        }
+
         originalSchema.put(tableId, getLatestOriginalSchema(tableId));
         schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId));
 
@@ -440,7 +448,8 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         long schemaEvolveTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;
         while (true) {
             SchemaChangeResponse response =
-                    sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
+                    sendRequestToCoordinator(
+                            new SchemaChangeRequest(tableId, 
schemaChangeEvent, subTaskId));
             if (response.isRegistryBusy()) {
                 if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
                     LOG.info(
@@ -609,4 +618,10 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
             }
         }
     }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        // Needless to do anything, since AbstractStreamOperator#snapshotState 
and #processElement
+        // is guaranteed not to be mixed together.
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index ae765bae2..1310fb6be 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -55,7 +55,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -78,7 +77,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
     /**
      * Atomic flag indicating if current RequestHandler could accept more 
schema changes for now.
      */
-    private final AtomicReference<RequestStatus> schemaChangeStatus;
+    private volatile RequestStatus schemaChangeStatus;
+
+    private final List<Integer> pendingSubTaskIds;
+    private final Object schemaChangeRequestLock;
 
     private volatile Throwable currentChangeException;
     private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents;
@@ -110,7 +112,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         this.currentDerivedSchemaChangeEvents = new ArrayList<>();
         this.currentFinishedSchemaChanges = new ArrayList<>();
         this.currentIgnoredSchemaChanges = new ArrayList<>();
-        this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE);
+
+        this.schemaChangeStatus = RequestStatus.IDLE;
+        this.pendingSubTaskIds = new ArrayList<>();
+        this.schemaChangeRequestLock = new Object();
     }
 
     /**
@@ -120,67 +125,100 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
      */
     public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
             SchemaChangeRequest request) {
-        if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, 
RequestStatus.WAITING_FOR_FLUSH)) {
-            LOG.info(
-                    "Received schema change event request {} from table {}. 
SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will 
be blocked.",
-                    request.getSchemaChangeEvent(),
-                    request.getTableId().toString());
-            SchemaChangeEvent event = request.getSchemaChangeEvent();
-
-            // If this schema change event has been requested by another 
subTask, ignore it.
-            if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
-                LOG.info("Event {} has been addressed before, ignoring it.", 
event);
-                clearCurrentSchemaChangeRequest();
-                Preconditions.checkState(
-                        schemaChangeStatus.compareAndSet(
-                                RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.IDLE),
-                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was duplicated, not "
-                                + schemaChangeStatus.get());
+
+        // We use requester subTask ID as the pending ticket, because there 
will be at most 1 schema
+        // change requests simultaneously from each subTask
+        int requestSubTaskId = request.getSubTaskId();
+
+        synchronized (schemaChangeRequestLock) {
+            // Make sure we handle the first request in the pending list to 
avoid out-of-order
+            // waiting and blocks checkpointing mechanism.
+            if (schemaChangeStatus == RequestStatus.IDLE) {
+                if (pendingSubTaskIds.isEmpty()) {
+                    LOG.info(
+                            "Received schema change event request {} from 
table {} from subTask {}. Pending list is empty, handling this.",
+                            request.getSchemaChangeEvent(),
+                            request.getTableId().toString(),
+                            requestSubTaskId);
+                } else if (pendingSubTaskIds.get(0) == requestSubTaskId) {
+                    LOG.info(
+                            "Received schema change event request {} from 
table {} from subTask {}. It is on the first of the pending list, handling 
this.",
+                            request.getSchemaChangeEvent(),
+                            request.getTableId().toString(),
+                            requestSubTaskId);
+                    pendingSubTaskIds.remove(0);
+                } else {
+                    LOG.info(
+                            "Received schema change event request {} from 
table {} from subTask {}. It is not the first of the pending list ({}).",
+                            request.getSchemaChangeEvent(),
+                            request.getTableId().toString(),
+                            requestSubTaskId,
+                            pendingSubTaskIds);
+                    if (!pendingSubTaskIds.contains(requestSubTaskId)) {
+                        pendingSubTaskIds.add(requestSubTaskId);
+                    }
+                    return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+                }
+
+                SchemaChangeEvent event = request.getSchemaChangeEvent();
+
+                // If this schema change event has been requested by another 
subTask, ignore it.
+                if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) 
{
+                    LOG.info("Event {} has been addressed before, ignoring 
it.", event);
+                    clearCurrentSchemaChangeRequest();
+                    LOG.info(
+                            "SchemaChangeStatus switched from 
WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
+                            request);
+                    return CompletableFuture.completedFuture(
+                            wrap(SchemaChangeResponse.duplicate()));
+                }
+                schemaManager.applyOriginalSchemaChange(event);
+                List<SchemaChangeEvent> derivedSchemaChangeEvents =
+                        
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
+
+                // If this schema change event is filtered out by LENIENT mode 
or merging table
+                // route strategies, ignore it.
+                if (derivedSchemaChangeEvents.isEmpty()) {
+                    LOG.info("Event {} is omitted from sending to downstream, 
ignoring it.", event);
+                    clearCurrentSchemaChangeRequest();
+                    LOG.info(
+                            "SchemaChangeStatus switched from 
WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
+                            request);
+                    return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+                }
+
                 LOG.info(
-                        "SchemaChangeStatus switched from WAITING_FOR_FLUSH to 
IDLE for request {} due to duplicated request.",
-                        request);
-                return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
-            }
-            schemaManager.applyOriginalSchemaChange(event);
-            List<SchemaChangeEvent> derivedSchemaChangeEvents =
-                    
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
-
-            // If this schema change event is filtered out by LENIENT mode or 
merging table route
-            // strategies, ignore it.
-            if (derivedSchemaChangeEvents.isEmpty()) {
-                LOG.info("Event {} is omitted from sending to downstream, 
ignoring it.", event);
-                clearCurrentSchemaChangeRequest();
-                Preconditions.checkState(
-                        schemaChangeStatus.compareAndSet(
-                                RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.IDLE),
-                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was ignored, not "
-                                + schemaChangeStatus.get());
+                        "SchemaChangeStatus switched from IDLE to 
WAITING_FOR_FLUSH, other requests will be blocked.");
+                // This request has been accepted.
+                schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
+
+                // Backfill pre-schema info for sink applying
+                derivedSchemaChangeEvents.forEach(
+                        e -> {
+                            if (e instanceof SchemaChangeEventWithPreSchema) {
+                                SchemaChangeEventWithPreSchema pe =
+                                        (SchemaChangeEventWithPreSchema) e;
+                                if (!pe.hasPreSchema()) {
+                                    schemaManager
+                                            
.getLatestEvolvedSchema(pe.tableId())
+                                            .ifPresent(pe::fillPreSchema);
+                                }
+                            }
+                        });
+                currentDerivedSchemaChangeEvents = new 
ArrayList<>(derivedSchemaChangeEvents);
+                return CompletableFuture.completedFuture(
+                        
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
+            } else {
                 LOG.info(
-                        "SchemaChangeStatus switched from WAITING_FOR_FLUSH to 
IDLE for request {} due to ignored request.",
-                        request);
-                return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+                        "Schema Registry is busy processing a schema change 
request, could not handle request {} for now. Added {} to pending list ({}).",
+                        request,
+                        requestSubTaskId,
+                        pendingSubTaskIds);
+                if (!pendingSubTaskIds.contains(requestSubTaskId)) {
+                    pendingSubTaskIds.add(requestSubTaskId);
+                }
+                return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
             }
-
-            // Backfill pre-schema info for sink applying
-            derivedSchemaChangeEvents.forEach(
-                    e -> {
-                        if (e instanceof SchemaChangeEventWithPreSchema) {
-                            SchemaChangeEventWithPreSchema pe = 
(SchemaChangeEventWithPreSchema) e;
-                            if (!pe.hasPreSchema()) {
-                                schemaManager
-                                        .getLatestEvolvedSchema(pe.tableId())
-                                        .ifPresent(pe::fillPreSchema);
-                            }
-                        }
-                    });
-            currentDerivedSchemaChangeEvents = new 
ArrayList<>(derivedSchemaChangeEvents);
-            return CompletableFuture.completedFuture(
-                    
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
-        } else {
-            LOG.info(
-                    "Schema Registry is busy processing a schema change 
request, could not handle request {} for now.",
-                    request);
-            return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
         }
     }
 
@@ -227,9 +265,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
             }
         }
         Preconditions.checkState(
-                schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, 
RequestStatus.FINISHED),
+                schemaChangeStatus == RequestStatus.APPLYING,
                 "Illegal schemaChangeStatus state: should be APPLYING before 
applySchemaChange finishes, not "
-                        + schemaChangeStatus.get());
+                        + schemaChangeStatus);
+        schemaChangeStatus = RequestStatus.FINISHED;
         LOG.info(
                 "SchemaChangeStatus switched from APPLYING to FINISHED for 
request {}.",
                 currentDerivedSchemaChangeEvents);
@@ -262,10 +301,11 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         }
         if (flushedSinkWriters.equals(activeSinkWriters)) {
             Preconditions.checkState(
-                    schemaChangeStatus.compareAndSet(
-                            RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.APPLYING),
+                    schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH,
                     "Illegal schemaChangeStatus state: should be 
WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
                             + schemaChangeStatus);
+
+            schemaChangeStatus = RequestStatus.APPLYING;
             LOG.info(
                     "All sink subtask have flushed for table {}. Start to 
apply schema change.",
                     tableId.toString());
@@ -276,9 +316,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
 
     public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
         Preconditions.checkState(
-                !schemaChangeStatus.get().equals(RequestStatus.IDLE),
+                schemaChangeStatus != RequestStatus.IDLE,
                 "Illegal schemaChangeStatus: should not be IDLE before getting 
schema change request results.");
-        if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, 
RequestStatus.IDLE)) {
+        if (schemaChangeStatus == RequestStatus.FINISHED) {
+            schemaChangeStatus = RequestStatus.IDLE;
             LOG.info(
                     "SchemaChangeStatus switched from FINISHED to IDLE for 
request {}",
                     currentDerivedSchemaChangeEvents);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
index fda6b02f1..fbc5e9c40 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
@@ -36,10 +36,14 @@ public class SchemaChangeRequest implements 
CoordinationRequest {
     private final TableId tableId;
     /** The schema changes. */
     private final SchemaChangeEvent schemaChangeEvent;
+    /** The ID of subTask that initiated the request. */
+    private final int subTaskId;
 
-    public SchemaChangeRequest(TableId tableId, SchemaChangeEvent 
schemaChangeEvent) {
+    public SchemaChangeRequest(
+            TableId tableId, SchemaChangeEvent schemaChangeEvent, int 
subTaskId) {
         this.tableId = tableId;
         this.schemaChangeEvent = schemaChangeEvent;
+        this.subTaskId = subTaskId;
     }
 
     public TableId getTableId() {
@@ -50,6 +54,10 @@ public class SchemaChangeRequest implements 
CoordinationRequest {
         return schemaChangeEvent;
     }
 
+    public int getSubTaskId() {
+        return subTaskId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -60,11 +68,12 @@ public class SchemaChangeRequest implements 
CoordinationRequest {
         }
         SchemaChangeRequest that = (SchemaChangeRequest) o;
         return Objects.equals(tableId, that.tableId)
-                && Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
+                && Objects.equals(schemaChangeEvent, that.schemaChangeEvent)
+                && subTaskId == that.subTaskId;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(tableId, schemaChangeEvent);
+        return Objects.hash(tableId, schemaChangeEvent, subTaskId);
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
index 1171ad07b..938e6950d 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
 import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -147,4 +148,10 @@ public class PrePartitionOperator extends 
AbstractStreamOperator<PartitioningEve
                             }
                         });
     }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        // Needless to do anything, since AbstractStreamOperator#snapshotState 
and #processElement
+        // is guaranteed not to be mixed together.
+    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index 60952b424..a469a3a91 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -164,7 +164,7 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
 
     public void registerTableSchema(TableId tableId, Schema schema) {
         schemaRegistry.handleCoordinationRequest(
-                new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, 
schema)));
+                new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, 
schema), 0));
         schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new 
CreateTableEvent(tableId, schema));
     }
 

Reply via email to