This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 060d2032e [hotfix][cdc-runtime] Fix schema registry hanging in 
multiple parallelism
060d2032e is described below

commit 060d2032e839c817f9b139e7699685290a68e2dc
Author: yuxiqian <[email protected]>
AuthorDate: Fri Aug 23 13:47:08 2024 +0800

    [hotfix][cdc-runtime] Fix schema registry hanging in multiple parallelism
    
    This closes  #3567.
---
 .../schema/coordinator/SchemaRegistry.java         | 14 ++++++--
 .../coordinator/SchemaRegistryRequestHandler.java  | 42 +++++++++++++++++-----
 2 files changed, 45 insertions(+), 11 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 8ea3a1f93..617daed92 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -104,6 +104,12 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
 
     private SchemaChangeBehavior schemaChangeBehavior;
 
+    /**
+     * Current parallelism. Use this to verify if Schema Registry has 
collected enough flush success
+     * events from sink operators.
+     */
+    private int currentParallelism;
+
     public SchemaRegistry(
             String operatorName,
             OperatorCoordinator.Context context,
@@ -135,7 +141,9 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
     public void start() throws Exception {
         LOG.info("Starting SchemaRegistry for {}.", operatorName);
         this.failedReasons.clear();
-        LOG.info("Started SchemaRegistry for {}.", operatorName);
+        this.currentParallelism = context.currentParallelism();
+        LOG.info(
+                "Started SchemaRegistry for {}. Parallelism: {}", 
operatorName, currentParallelism);
     }
 
     @Override
@@ -155,7 +163,9 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
                         flushSuccessEvent.getSubtask(),
                         flushSuccessEvent.getTableId().toString());
                 requestHandler.flushSuccess(
-                        flushSuccessEvent.getTableId(), 
flushSuccessEvent.getSubtask());
+                        flushSuccessEvent.getTableId(),
+                        flushSuccessEvent.getSubtask(),
+                        currentParallelism);
             } else if (event instanceof SinkWriterRegisterEvent) {
                 requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) 
event).getSubtask());
             } else {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 99019a6b4..444fb41d2 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -48,11 +48,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
@@ -103,8 +103,8 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         this.schemaDerivation = schemaDerivation;
         this.schemaChangeBehavior = schemaChangeBehavior;
 
-        this.activeSinkWriters = new HashSet<>();
-        this.flushedSinkWriters = new HashSet<>();
+        this.activeSinkWriters = ConcurrentHashMap.newKeySet();
+        this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
         this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
 
         this.currentDerivedSchemaChangeEvents = new ArrayList<>();
@@ -122,7 +122,7 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
             SchemaChangeRequest request) {
         if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, 
RequestStatus.WAITING_FOR_FLUSH)) {
             LOG.info(
-                    "Received schema change event request {} from table {}. 
Start to buffer requests for others.",
+                    "Received schema change event request {} from table {}. 
SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will 
be blocked.",
                     request.getSchemaChangeEvent(),
                     request.getTableId().toString());
             SchemaChangeEvent event = request.getSchemaChangeEvent();
@@ -134,7 +134,11 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                 Preconditions.checkState(
                         schemaChangeStatus.compareAndSet(
                                 RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.IDLE),
-                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was duplicated.");
+                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was duplicated, not "
+                                + schemaChangeStatus.get());
+                LOG.info(
+                        "SchemaChangeStatus switched from WAITING_FOR_FLUSH to 
IDLE for request {} due to duplicated request.",
+                        request);
                 return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
             }
             schemaManager.applyOriginalSchemaChange(event);
@@ -149,7 +153,11 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                 Preconditions.checkState(
                         schemaChangeStatus.compareAndSet(
                                 RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.IDLE),
-                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was ignored.");
+                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was ignored, not "
+                                + schemaChangeStatus.get());
+                LOG.info(
+                        "SchemaChangeStatus switched from WAITING_FOR_FLUSH to 
IDLE for request {} due to ignored request.",
+                        request);
                 return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
             }
 
@@ -220,7 +228,11 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         }
         Preconditions.checkState(
                 schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, 
RequestStatus.FINISHED),
-                "Illegal schemaChangeStatus state: should be APPLYING before 
applySchemaChange finishes");
+                "Illegal schemaChangeStatus state: should be APPLYING before 
applySchemaChange finishes, not "
+                        + schemaChangeStatus.get());
+        LOG.info(
+                "SchemaChangeStatus switched from APPLYING to FINISHED for 
request {}.",
+                currentDerivedSchemaChangeEvents);
     }
 
     /**
@@ -239,13 +251,21 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
      * @param tableId the subtask in SchemaOperator and table that the 
FlushEvent is about
      * @param sinkSubtask the sink subtask succeed flushing
      */
-    public void flushSuccess(TableId tableId, int sinkSubtask) {
+    public void flushSuccess(TableId tableId, int sinkSubtask, int 
parallelism) {
         flushedSinkWriters.add(sinkSubtask);
+        if (activeSinkWriters.size() < parallelism) {
+            LOG.info(
+                    "Not all active sink writers have been registered. Current 
{}, expected {}.",
+                    activeSinkWriters.size(),
+                    parallelism);
+            return;
+        }
         if (flushedSinkWriters.equals(activeSinkWriters)) {
             Preconditions.checkState(
                     schemaChangeStatus.compareAndSet(
                             RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.APPLYING),
-                    "Illegal schemaChangeStatus state: should be 
WAITING_FOR_FLUSH before collecting enough FlushEvents");
+                    "Illegal schemaChangeStatus state: should be 
WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
+                            + schemaChangeStatus);
             LOG.info(
                     "All sink subtask have flushed for table {}. Start to 
apply schema change.",
                     tableId.toString());
@@ -259,6 +279,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                 !schemaChangeStatus.get().equals(RequestStatus.IDLE),
                 "Illegal schemaChangeStatus: should not be IDLE before getting 
schema change request results.");
         if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, 
RequestStatus.IDLE)) {
+            LOG.info(
+                    "SchemaChangeStatus switched from FINISHED to IDLE for 
request {}",
+                    currentDerivedSchemaChangeEvents);
+
             // This request has been finished, return it and prepare for the 
next request
             List<SchemaChangeEvent> finishedEvents = 
clearCurrentSchemaChangeRequest();
             return CompletableFuture.supplyAsync(

Reply via email to