This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 9816848c4b46ed303423411c354b92fcdb10c2a5
Author: yuxiqian <[email protected]>
AuthorDate: Mon Aug 26 14:07:54 2024 +0800

    [hotfix][cdc-runtime] Keep upstream pending requests in order to avoid 
checkpoint hanging
---
 .../cdc/pipeline/tests/SchemaEvolveE2eITCase.java  |   1 +
 .../tests/SchemaEvolvingTransformE2eITCase.java    |   1 +
 .../runtime/operators/schema/SchemaOperator.java   |  10 +-
 .../coordinator/SchemaRegistryRequestHandler.java  | 148 +++++++++++++--------
 .../schema/event/SchemaChangeRequest.java          |  15 ++-
 .../runtime/partitioning/PrePartitionOperator.java |   7 +
 .../operators/EventOperatorTestHarness.java        |   2 +-
 7 files changed, 125 insertions(+), 59 deletions(-)

diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
index 0a8d7c483..64e3e9c57 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
@@ -338,6 +338,7 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
             // triggers DropColumnEvent
             stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
             stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
+            stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
         }
 
         List<String> expectedTmEvents =
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
index 8b306d1e7..edcac1bbe 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
@@ -349,6 +349,7 @@ public class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
             // triggers DropColumnEvent
             stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
             stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
+            stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
         }
 
         List<String> expectedTmEvents =
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index 1b4f50e89..a1bdd7885 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -440,7 +441,8 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         long schemaEvolveTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;
         while (true) {
             SchemaChangeResponse response =
-                    sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
+                    sendRequestToCoordinator(
+                            new SchemaChangeRequest(tableId, 
schemaChangeEvent, subTaskId));
             if (response.isRegistryBusy()) {
                 if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
                     LOG.info(
@@ -609,4 +611,10 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
             }
         }
     }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        // Needless to do anything, since AbstractStreamOperator#snapshotState 
and #processElement
+        // is guaranteed not to be mixed together.
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 9262a6a90..de5bde4e6 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -54,7 +54,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -77,7 +76,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
     /**
      * Atomic flag indicating if current RequestHandler could accept more 
schema changes for now.
      */
-    private final AtomicReference<RequestStatus> schemaChangeStatus;
+    private volatile RequestStatus schemaChangeStatus;
+
+    private final List<Integer> pendingSubTaskIds;
+    private final Object schemaChangeRequestLock;
 
     private volatile Throwable currentChangeException;
     private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents;
@@ -109,7 +111,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         this.currentDerivedSchemaChangeEvents = new ArrayList<>();
         this.currentFinishedSchemaChanges = new ArrayList<>();
         this.currentIgnoredSchemaChanges = new ArrayList<>();
-        this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE);
+
+        this.schemaChangeStatus = RequestStatus.IDLE;
+        this.pendingSubTaskIds = new ArrayList<>();
+        this.schemaChangeRequestLock = new Object();
     }
 
     /**
@@ -119,54 +124,86 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
      */
     public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
             SchemaChangeRequest request) {
-        if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, 
RequestStatus.WAITING_FOR_FLUSH)) {
-            LOG.info(
-                    "Received schema change event request {} from table {}. 
SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will 
be blocked.",
-                    request.getSchemaChangeEvent(),
-                    request.getTableId().toString());
-            SchemaChangeEvent event = request.getSchemaChangeEvent();
-
-            // If this schema change event has been requested by another 
subTask, ignore it.
-            if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
-                LOG.info("Event {} has been addressed before, ignoring it.", 
event);
-                clearCurrentSchemaChangeRequest();
-                Preconditions.checkState(
-                        schemaChangeStatus.compareAndSet(
-                                RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.IDLE),
-                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was duplicated, not "
-                                + schemaChangeStatus.get());
+
+        // We use requester subTask ID as the pending ticket, because there 
will be at most 1 schema
+        // change requests simultaneously from each subTask
+        int requestSubTaskId = request.getSubTaskId();
+
+        synchronized (schemaChangeRequestLock) {
+            // Make sure we handle the first request in the pending list to 
avoid out-of-order
+            // waiting and blocks checkpointing mechanism.
+            if (schemaChangeStatus == RequestStatus.IDLE) {
+                if (pendingSubTaskIds.isEmpty()) {
+                    LOG.info(
+                            "Received schema change event request {} from 
table {} from subTask {}. Pending list is empty, handling this.",
+                            request.getSchemaChangeEvent(),
+                            request.getTableId().toString(),
+                            requestSubTaskId);
+                } else if (pendingSubTaskIds.get(0) == requestSubTaskId) {
+                    LOG.info(
+                            "Received schema change event request {} from 
table {} from subTask {}. It is on the first of the pending list, handling 
this.",
+                            request.getSchemaChangeEvent(),
+                            request.getTableId().toString(),
+                            requestSubTaskId);
+                    pendingSubTaskIds.remove(0);
+                } else {
+                    LOG.info(
+                            "Received schema change event request {} from 
table {} from subTask {}. It is not the first of the pending list ({}).",
+                            request.getSchemaChangeEvent(),
+                            request.getTableId().toString(),
+                            requestSubTaskId,
+                            pendingSubTaskIds);
+                    if (!pendingSubTaskIds.contains(requestSubTaskId)) {
+                        pendingSubTaskIds.add(requestSubTaskId);
+                    }
+                    return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+                }
+
+                SchemaChangeEvent event = request.getSchemaChangeEvent();
+
+                // If this schema change event has been requested by another 
subTask, ignore it.
+                if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) 
{
+                    LOG.info("Event {} has been addressed before, ignoring 
it.", event);
+                    clearCurrentSchemaChangeRequest();
+                    LOG.info(
+                            "SchemaChangeStatus switched from 
WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
+                            request);
+                    return CompletableFuture.completedFuture(
+                            wrap(SchemaChangeResponse.duplicate()));
+                }
+                schemaManager.applyOriginalSchemaChange(event);
+                List<SchemaChangeEvent> derivedSchemaChangeEvents =
+                        
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
+
+                // If this schema change event is filtered out by LENIENT mode 
or merging table
+                // route strategies, ignore it.
+                if (derivedSchemaChangeEvents.isEmpty()) {
+                    LOG.info("Event {} is omitted from sending to downstream, 
ignoring it.", event);
+                    clearCurrentSchemaChangeRequest();
+                    LOG.info(
+                            "SchemaChangeStatus switched from 
WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
+                            request);
+                    return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+                }
+
                 LOG.info(
-                        "SchemaChangeStatus switched from WAITING_FOR_FLUSH to 
IDLE for request {} due to duplicated request.",
-                        request);
-                return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
-            }
-            schemaManager.applyOriginalSchemaChange(event);
-            List<SchemaChangeEvent> derivedSchemaChangeEvents =
-                    
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
-
-            // If this schema change event is filtered out by LENIENT mode or 
merging table route
-            // strategies, ignore it.
-            if (derivedSchemaChangeEvents.isEmpty()) {
-                LOG.info("Event {} is omitted from sending to downstream, 
ignoring it.", event);
-                clearCurrentSchemaChangeRequest();
-                Preconditions.checkState(
-                        schemaChangeStatus.compareAndSet(
-                                RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.IDLE),
-                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was ignored, not "
-                                + schemaChangeStatus.get());
+                        "SchemaChangeStatus switched from IDLE to 
WAITING_FOR_FLUSH, other requests will be blocked.");
+                // This request has been accepted.
+                schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
+                currentDerivedSchemaChangeEvents = new 
ArrayList<>(derivedSchemaChangeEvents);
+                return CompletableFuture.completedFuture(
+                        
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
+            } else {
                 LOG.info(
-                        "SchemaChangeStatus switched from WAITING_FOR_FLUSH to 
IDLE for request {} due to ignored request.",
-                        request);
-                return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+                        "Schema Registry is busy processing a schema change 
request, could not handle request {} for now. Added {} to pending list ({}).",
+                        request,
+                        requestSubTaskId,
+                        pendingSubTaskIds);
+                if (!pendingSubTaskIds.contains(requestSubTaskId)) {
+                    pendingSubTaskIds.add(requestSubTaskId);
+                }
+                return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
             }
-            currentDerivedSchemaChangeEvents = new 
ArrayList<>(derivedSchemaChangeEvents);
-            return CompletableFuture.completedFuture(
-                    
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
-        } else {
-            LOG.info(
-                    "Schema Registry is busy processing a schema change 
request, could not handle request {} for now.",
-                    request);
-            return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
         }
     }
 
@@ -213,9 +250,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
             }
         }
         Preconditions.checkState(
-                schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, 
RequestStatus.FINISHED),
+                schemaChangeStatus == RequestStatus.APPLYING,
                 "Illegal schemaChangeStatus state: should be APPLYING before 
applySchemaChange finishes, not "
-                        + schemaChangeStatus.get());
+                        + schemaChangeStatus);
+        schemaChangeStatus = RequestStatus.FINISHED;
         LOG.info(
                 "SchemaChangeStatus switched from APPLYING to FINISHED for 
request {}.",
                 currentDerivedSchemaChangeEvents);
@@ -248,10 +286,11 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         }
         if (flushedSinkWriters.equals(activeSinkWriters)) {
             Preconditions.checkState(
-                    schemaChangeStatus.compareAndSet(
-                            RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.APPLYING),
+                    schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH,
                     "Illegal schemaChangeStatus state: should be 
WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
                             + schemaChangeStatus);
+
+            schemaChangeStatus = RequestStatus.APPLYING;
             LOG.info(
                     "All sink subtask have flushed for table {}. Start to 
apply schema change.",
                     tableId.toString());
@@ -262,9 +301,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
 
     public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
         Preconditions.checkState(
-                !schemaChangeStatus.get().equals(RequestStatus.IDLE),
+                schemaChangeStatus != RequestStatus.IDLE,
                 "Illegal schemaChangeStatus: should not be IDLE before getting 
schema change request results.");
-        if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, 
RequestStatus.IDLE)) {
+        if (schemaChangeStatus == RequestStatus.FINISHED) {
+            schemaChangeStatus = RequestStatus.IDLE;
             LOG.info(
                     "SchemaChangeStatus switched from FINISHED to IDLE for 
request {}",
                     currentDerivedSchemaChangeEvents);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
index fda6b02f1..fbc5e9c40 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
@@ -36,10 +36,14 @@ public class SchemaChangeRequest implements 
CoordinationRequest {
     private final TableId tableId;
     /** The schema changes. */
     private final SchemaChangeEvent schemaChangeEvent;
+    /** The ID of subTask that initiated the request. */
+    private final int subTaskId;
 
-    public SchemaChangeRequest(TableId tableId, SchemaChangeEvent 
schemaChangeEvent) {
+    public SchemaChangeRequest(
+            TableId tableId, SchemaChangeEvent schemaChangeEvent, int 
subTaskId) {
         this.tableId = tableId;
         this.schemaChangeEvent = schemaChangeEvent;
+        this.subTaskId = subTaskId;
     }
 
     public TableId getTableId() {
@@ -50,6 +54,10 @@ public class SchemaChangeRequest implements 
CoordinationRequest {
         return schemaChangeEvent;
     }
 
+    public int getSubTaskId() {
+        return subTaskId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -60,11 +68,12 @@ public class SchemaChangeRequest implements 
CoordinationRequest {
         }
         SchemaChangeRequest that = (SchemaChangeRequest) o;
         return Objects.equals(tableId, that.tableId)
-                && Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
+                && Objects.equals(schemaChangeEvent, that.schemaChangeEvent)
+                && subTaskId == that.subTaskId;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(tableId, schemaChangeEvent);
+        return Objects.hash(tableId, schemaChangeEvent, subTaskId);
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
index 1171ad07b..938e6950d 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
 import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -147,4 +148,10 @@ public class PrePartitionOperator extends 
AbstractStreamOperator<PartitioningEve
                             }
                         });
     }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        // Needless to do anything, since AbstractStreamOperator#snapshotState 
and #processElement
+        // is guaranteed not to be mixed together.
+    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index 60952b424..a469a3a91 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -164,7 +164,7 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
 
     public void registerTableSchema(TableId tableId, Schema schema) {
         schemaRegistry.handleCoordinationRequest(
-                new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, 
schema)));
+                new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, 
schema), 0));
         schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new 
CreateTableEvent(tableId, schema));
     }
 

Reply via email to