This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a876af258 [minor][cdc-runtime] Run schema coordinator logic 
asynchronously to avoid blocking the main thread
a876af258 is described below

commit a876af2589ef61bfb13bc2c01a258d5eeaa7afac
Author: yuxiqian <[email protected]>
AuthorDate: Wed Aug 28 00:38:02 2024 +0800

    [minor][cdc-runtime] Run schema coordinator logic asynchronously to avoid 
blocking the main thread
    
    This closes #3557
---
 .../tests/SchemaRegistryMigrationMock.java         |   2 +
 .../schema/coordinator/SchemaRegistry.java         | 232 ++++++++++++++-------
 .../schema/coordinator/SchemaRegistryProvider.java |  54 ++++-
 .../coordinator/SchemaRegistryRequestHandler.java  |  40 ++--
 .../runtime/operators/schema/SchemaEvolveTest.java |  14 +-
 .../operators/EventOperatorTestHarness.java        |  32 ++-
 .../MockedOperatorCoordinatorContext.java          |  44 ++++
 7 files changed, 314 insertions(+), 104 deletions(-)

diff --git 
a/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
 
b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
index bb19223e6..34cd1ea03 100644
--- 
a/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
+++ 
b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
@@ -40,6 +40,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 /** Dummy classes for migration test. Called via reflection. */
@@ -69,6 +70,7 @@ public class SchemaRegistryMigrationMock implements 
MigrationMockBase {
         return new SchemaRegistry(
                 "Dummy Name",
                 null,
+                Executors.newFixedThreadPool(1),
                 new MetadataApplier() {
                     @Override
                     public boolean acceptsSchemaEvolutionType(
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 617daed92..64d4fa1bc 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -37,7 +37,9 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandle
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
 
@@ -83,6 +86,9 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
     /** The name of the operator this SchemaOperatorCoordinator is associated 
with. */
     private final String operatorName;
 
+    /** A single-thread executor to handle async execution of the coordinator. 
*/
+    private final ExecutorService coordinatorExecutor;
+
     /**
      * Tracks the subtask failed reason to throw a more meaningful exception 
in {@link
      * #subtaskReset}.
@@ -113,18 +119,27 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
     public SchemaRegistry(
             String operatorName,
             OperatorCoordinator.Context context,
+            ExecutorService executorService,
             MetadataApplier metadataApplier,
             List<RouteRule> routes) {
-        this(operatorName, context, metadataApplier, routes, 
SchemaChangeBehavior.EVOLVE);
+        this(
+                operatorName,
+                context,
+                executorService,
+                metadataApplier,
+                routes,
+                SchemaChangeBehavior.LENIENT);
     }
 
     public SchemaRegistry(
             String operatorName,
             OperatorCoordinator.Context context,
+            ExecutorService coordinatorExecutor,
             MetadataApplier metadataApplier,
             List<RouteRule> routes,
             SchemaChangeBehavior schemaChangeBehavior) {
         this.context = context;
+        this.coordinatorExecutor = coordinatorExecutor;
         this.operatorName = operatorName;
         this.failedReasons = new HashMap<>();
         this.metadataApplier = metadataApplier;
@@ -133,7 +148,11 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
         this.schemaDerivation = new SchemaDerivation(schemaManager, routes, 
new HashMap<>());
         this.requestHandler =
                 new SchemaRegistryRequestHandler(
-                        metadataApplier, schemaManager, schemaDerivation, 
schemaChangeBehavior);
+                        metadataApplier,
+                        schemaManager,
+                        schemaDerivation,
+                        schemaChangeBehavior,
+                        context);
         this.schemaChangeBehavior = schemaChangeBehavior;
     }
 
@@ -153,48 +172,87 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
     }
 
     @Override
-    public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event)
-            throws Exception {
-        try {
-            if (event instanceof FlushSuccessEvent) {
-                FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) 
event;
-                LOG.info(
-                        "Sink subtask {} succeed flushing for table {}.",
-                        flushSuccessEvent.getSubtask(),
-                        flushSuccessEvent.getTableId().toString());
-                requestHandler.flushSuccess(
-                        flushSuccessEvent.getTableId(),
-                        flushSuccessEvent.getSubtask(),
-                        currentParallelism);
-            } else if (event instanceof SinkWriterRegisterEvent) {
-                requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) 
event).getSubtask());
-            } else {
-                throw new FlinkException("Unrecognized Operator Event: " + 
event);
-            }
-        } catch (Throwable t) {
-            context.failJob(t);
-            throw t;
-        }
+    public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+        runInEventLoop(
+                () -> {
+                    try {
+                        if (event instanceof FlushSuccessEvent) {
+                            FlushSuccessEvent flushSuccessEvent = 
(FlushSuccessEvent) event;
+                            LOG.info(
+                                    "Sink subtask {} succeed flushing for 
table {}.",
+                                    flushSuccessEvent.getSubtask(),
+                                    flushSuccessEvent.getTableId().toString());
+                            requestHandler.flushSuccess(
+                                    flushSuccessEvent.getTableId(),
+                                    flushSuccessEvent.getSubtask(),
+                                    currentParallelism);
+                        } else if (event instanceof SinkWriterRegisterEvent) {
+                            requestHandler.registerSinkWriter(
+                                    ((SinkWriterRegisterEvent) 
event).getSubtask());
+                        } else {
+                            throw new FlinkException("Unrecognized Operator 
Event: " + event);
+                        }
+                    } catch (Throwable t) {
+                        context.failJob(t);
+                        throw t;
+                    }
+                },
+                "handling event %s from subTask %d",
+                event,
+                subtask);
     }
 
     @Override
-    public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture)
-            throws Exception {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                DataOutputStream out = new DataOutputStream(baos)) {
-            // Serialize SchemaManager
-            int schemaManagerSerializerVersion = 
SchemaManager.SERIALIZER.getVersion();
-            out.writeInt(schemaManagerSerializerVersion);
-            byte[] serializedSchemaManager = 
SchemaManager.SERIALIZER.serialize(schemaManager);
-            out.writeInt(serializedSchemaManager.length);
-            out.write(serializedSchemaManager);
-            // Serialize SchemaDerivation mapping
-            SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
-            resultFuture.complete(baos.toByteArray());
-        } catch (Throwable t) {
-            context.failJob(t);
-            throw t;
-        }
+    public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+        // we generate checkpoint in an async thread to not block the 
JobManager's main thread, the
+        // coordinator state might be large if there are many schema changes 
and monitor many
+        // tables.
+        runInEventLoop(
+                () -> {
+                    try (ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                            DataOutputStream out = new DataOutputStream(baos)) 
{
+                        // Serialize SchemaManager
+                        int schemaManagerSerializerVersion = 
SchemaManager.SERIALIZER.getVersion();
+                        out.writeInt(schemaManagerSerializerVersion);
+                        byte[] serializedSchemaManager =
+                                
SchemaManager.SERIALIZER.serialize(schemaManager);
+                        out.writeInt(serializedSchemaManager.length);
+                        out.write(serializedSchemaManager);
+                        // Serialize SchemaDerivation mapping
+                        
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
+                        resultFuture.complete(baos.toByteArray());
+                    } catch (Throwable t) {
+                        context.failJob(t);
+                        throw t;
+                    }
+                },
+                "taking checkpoint %d",
+                checkpointId);
+    }
+
+    private void runInEventLoop(
+            final ThrowingRunnable<Throwable> action,
+            final String actionName,
+            final Object... actionNameFormatParameters) {
+        coordinatorExecutor.execute(
+                () -> {
+                    try {
+                        action.run();
+                    } catch (Throwable t) {
+                        // if we have a JVM critical error, promote it 
immediately, there is a good
+                        // chance the logging or job failing will not succeed 
anymore
+                        ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+                        final String actionString =
+                                String.format(actionName, 
actionNameFormatParameters);
+                        LOG.error(
+                                "Uncaught exception in the 
SchemaEvolutionCoordinator for {} while {}. Triggering job failover.",
+                                operatorName,
+                                actionString,
+                                t);
+                        context.failJob(t);
+                    }
+                });
     }
 
     @Override
@@ -205,26 +263,34 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
     @Override
     public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
             CoordinationRequest request) {
-        try {
-            if (request instanceof SchemaChangeRequest) {
-                SchemaChangeRequest schemaChangeRequest = 
(SchemaChangeRequest) request;
-                return 
requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
-            } else if (request instanceof SchemaChangeResultRequest) {
-                return requestHandler.getSchemaChangeResult();
-            } else if (request instanceof GetEvolvedSchemaRequest) {
-                return CompletableFuture.completedFuture(
-                        
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
-            } else if (request instanceof GetOriginalSchemaRequest) {
-                return CompletableFuture.completedFuture(
-                        
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
-            } else {
-                throw new IllegalArgumentException(
-                        "Unrecognized CoordinationRequest type: " + request);
-            }
-        } catch (Throwable t) {
-            context.failJob(t);
-            throw t;
-        }
+        CompletableFuture<CoordinationResponse> responseFuture = new 
CompletableFuture<>();
+        runInEventLoop(
+                () -> {
+                    try {
+                        if (request instanceof SchemaChangeRequest) {
+                            SchemaChangeRequest schemaChangeRequest = 
(SchemaChangeRequest) request;
+                            requestHandler.handleSchemaChangeRequest(
+                                    schemaChangeRequest, responseFuture);
+                        } else if (request instanceof 
SchemaChangeResultRequest) {
+                            
requestHandler.getSchemaChangeResult(responseFuture);
+                        } else if (request instanceof GetEvolvedSchemaRequest) 
{
+                            handleGetEvolvedSchemaRequest(
+                                    ((GetEvolvedSchemaRequest) request), 
responseFuture);
+                        } else if (request instanceof 
GetOriginalSchemaRequest) {
+                            handleGetOriginalSchemaRequest(
+                                    (GetOriginalSchemaRequest) request, 
responseFuture);
+                        } else {
+                            throw new IllegalArgumentException(
+                                    "Unrecognized CoordinationRequest type: " 
+ request);
+                        }
+                    } catch (Throwable t) {
+                        context.failJob(t);
+                        throw t;
+                    }
+                },
+                "handling coordination request %s",
+                request);
+        return responseFuture;
     }
 
     @Override
@@ -253,7 +319,8 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
                                         metadataApplier,
                                         schemaManager,
                                         schemaDerivation,
-                                        schemaManager.getBehavior());
+                                        schemaManager.getBehavior(),
+                                        context);
                         break;
                     }
                 case 1:
@@ -274,7 +341,8 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
                                         metadataApplier,
                                         schemaManager,
                                         schemaDerivation,
-                                        schemaChangeBehavior);
+                                        schemaChangeBehavior,
+                                        context);
                         break;
                     }
                 default:
@@ -307,46 +375,56 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
         // do nothing
     }
 
-    private GetEvolvedSchemaResponse handleGetEvolvedSchemaRequest(
-            GetEvolvedSchemaRequest getEvolvedSchemaRequest) {
+    private void handleGetEvolvedSchemaRequest(
+            GetEvolvedSchemaRequest getEvolvedSchemaRequest,
+            CompletableFuture<CoordinationResponse> response) {
         LOG.info("Handling evolved schema request: {}", 
getEvolvedSchemaRequest);
         int schemaVersion = getEvolvedSchemaRequest.getSchemaVersion();
         TableId tableId = getEvolvedSchemaRequest.getTableId();
         if (schemaVersion == GetEvolvedSchemaRequest.LATEST_SCHEMA_VERSION) {
-            return new GetEvolvedSchemaResponse(
-                    
schemaManager.getLatestEvolvedSchema(tableId).orElse(null));
+            response.complete(
+                    wrap(
+                            new GetEvolvedSchemaResponse(
+                                    
schemaManager.getLatestEvolvedSchema(tableId).orElse(null))));
         } else {
             try {
-                return new GetEvolvedSchemaResponse(
-                        schemaManager.getEvolvedSchema(tableId, 
schemaVersion));
+                response.complete(
+                        wrap(
+                                new GetEvolvedSchemaResponse(
+                                        
schemaManager.getEvolvedSchema(tableId, schemaVersion))));
             } catch (IllegalArgumentException iae) {
                 LOG.warn(
                         "Some client is requesting an non-existed evolved 
schema for table {} with version {}",
                         tableId,
                         schemaVersion);
-                return new GetEvolvedSchemaResponse(null);
+                response.complete(wrap(new GetEvolvedSchemaResponse(null)));
             }
         }
     }
 
-    private GetOriginalSchemaResponse handleGetOriginalSchemaRequest(
-            GetOriginalSchemaRequest getOriginalSchemaRequest) {
+    private void handleGetOriginalSchemaRequest(
+            GetOriginalSchemaRequest getOriginalSchemaRequest,
+            CompletableFuture<CoordinationResponse> response) {
         LOG.info("Handling original schema request: {}", 
getOriginalSchemaRequest);
         int schemaVersion = getOriginalSchemaRequest.getSchemaVersion();
         TableId tableId = getOriginalSchemaRequest.getTableId();
         if (schemaVersion == GetOriginalSchemaRequest.LATEST_SCHEMA_VERSION) {
-            return new GetOriginalSchemaResponse(
-                    
schemaManager.getLatestOriginalSchema(tableId).orElse(null));
+            response.complete(
+                    wrap(
+                            new GetOriginalSchemaResponse(
+                                    
schemaManager.getLatestOriginalSchema(tableId).orElse(null))));
         } else {
             try {
-                return new GetOriginalSchemaResponse(
-                        schemaManager.getOriginalSchema(tableId, 
schemaVersion));
+                response.complete(
+                        wrap(
+                                new GetOriginalSchemaResponse(
+                                        
schemaManager.getOriginalSchema(tableId, schemaVersion))));
             } catch (IllegalArgumentException iae) {
                 LOG.warn(
                         "Some client is requesting an non-existed original 
schema for table {} with version {}",
                         tableId,
                         schemaVersion);
-                return new GetOriginalSchemaResponse(null);
+                response.complete(wrap(new GetOriginalSchemaResponse(null)));
             }
         }
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
index dd7f2dc36..bc261e40f 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
@@ -23,8 +23,12 @@ import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.FatalExitExceptionHandler;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 /** Provider of {@link SchemaRegistry}. */
 @Internal
@@ -57,7 +61,55 @@ public class SchemaRegistryProvider implements 
OperatorCoordinator.Provider {
 
     @Override
     public OperatorCoordinator create(OperatorCoordinator.Context context) 
throws Exception {
+        CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+                new CoordinatorExecutorThreadFactory(
+                        "schema-evolution-coordinator", 
context.getUserCodeClassloader());
+        ExecutorService coordinatorExecutor =
+                Executors.newSingleThreadExecutor(coordinatorThreadFactory);
         return new SchemaRegistry(
-                operatorName, context, metadataApplier, routingRules, 
schemaChangeBehavior);
+                operatorName,
+                context,
+                coordinatorExecutor,
+                metadataApplier,
+                routingRules,
+                schemaChangeBehavior);
+    }
+
+    /** A thread factory class that provides some helper methods. */
+    public static class CoordinatorExecutorThreadFactory implements 
ThreadFactory {
+
+        private final String coordinatorThreadName;
+        private final ClassLoader cl;
+        private final Thread.UncaughtExceptionHandler errorHandler;
+
+        private Thread t;
+
+        CoordinatorExecutorThreadFactory(
+                final String coordinatorThreadName, final ClassLoader 
contextClassLoader) {
+            this(coordinatorThreadName, contextClassLoader, 
FatalExitExceptionHandler.INSTANCE);
+        }
+
+        CoordinatorExecutorThreadFactory(
+                final String coordinatorThreadName,
+                final ClassLoader contextClassLoader,
+                final Thread.UncaughtExceptionHandler errorHandler) {
+            this.coordinatorThreadName = coordinatorThreadName;
+            this.cl = contextClassLoader;
+            this.errorHandler = errorHandler;
+        }
+
+        @Override
+        public synchronized Thread newThread(Runnable r) {
+            if (t != null) {
+                throw new Error(
+                        "This indicates that a fatal error has happened and 
caused the "
+                                + "coordinator executor thread to exit. Check 
the earlier logs"
+                                + "to see the root cause of the problem.");
+            }
+            t = new Thread(r, coordinatorThreadName);
+            t.setContextClassLoader(cl);
+            t.setUncaughtExceptionHandler(errorHandler);
+            return t;
+        }
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 1310fb6be..847e343f2 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,15 +96,19 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
 
     private final SchemaChangeBehavior schemaChangeBehavior;
 
+    private final OperatorCoordinator.Context context;
+
     public SchemaRegistryRequestHandler(
             MetadataApplier metadataApplier,
             SchemaManager schemaManager,
             SchemaDerivation schemaDerivation,
-            SchemaChangeBehavior schemaChangeBehavior) {
+            SchemaChangeBehavior schemaChangeBehavior,
+            OperatorCoordinator.Context context) {
         this.metadataApplier = metadataApplier;
         this.schemaManager = schemaManager;
         this.schemaDerivation = schemaDerivation;
         this.schemaChangeBehavior = schemaChangeBehavior;
+        this.context = context;
 
         this.activeSinkWriters = ConcurrentHashMap.newKeySet();
         this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
@@ -123,8 +128,8 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
      *
      * @param request the received SchemaChangeRequest
      */
-    public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
-            SchemaChangeRequest request) {
+    public void handleSchemaChangeRequest(
+            SchemaChangeRequest request, 
CompletableFuture<CoordinationResponse> response) {
 
         // We use requester subTask ID as the pending ticket, because there 
will be at most 1 schema
         // change requests simultaneously from each subTask
@@ -157,7 +162,8 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                     if (!pendingSubTaskIds.contains(requestSubTaskId)) {
                         pendingSubTaskIds.add(requestSubTaskId);
                     }
-                    return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+                    response.complete(wrap(SchemaChangeResponse.busy()));
+                    return;
                 }
 
                 SchemaChangeEvent event = request.getSchemaChangeEvent();
@@ -169,8 +175,8 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                     LOG.info(
                             "SchemaChangeStatus switched from 
WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
                             request);
-                    return CompletableFuture.completedFuture(
-                            wrap(SchemaChangeResponse.duplicate()));
+                    response.complete(wrap(SchemaChangeResponse.duplicate()));
+                    return;
                 }
                 schemaManager.applyOriginalSchemaChange(event);
                 List<SchemaChangeEvent> derivedSchemaChangeEvents =
@@ -184,7 +190,9 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                     LOG.info(
                             "SchemaChangeStatus switched from 
WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
                             request);
-                    return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+
+                    response.complete(wrap(SchemaChangeResponse.ignored()));
+                    return;
                 }
 
                 LOG.info(
@@ -206,8 +214,8 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                             }
                         });
                 currentDerivedSchemaChangeEvents = new 
ArrayList<>(derivedSchemaChangeEvents);
-                return CompletableFuture.completedFuture(
-                        
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
+
+                
response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
             } else {
                 LOG.info(
                         "Schema Registry is busy processing a schema change 
request, could not handle request {} for now. Added {} to pending list ({}).",
@@ -217,7 +225,7 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                 if (!pendingSubTaskIds.contains(requestSubTaskId)) {
                     pendingSubTaskIds.add(requestSubTaskId);
                 }
-                return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+                response.complete(wrap(SchemaChangeResponse.busy()));
             }
         }
     }
@@ -314,7 +322,7 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         }
     }
 
-    public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
+    public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> 
response) {
         Preconditions.checkState(
                 schemaChangeStatus != RequestStatus.IDLE,
                 "Illegal schemaChangeStatus: should not be IDLE before getting 
schema change request results.");
@@ -326,11 +334,12 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
 
             // This request has been finished, return it and prepare for the 
next request
             List<SchemaChangeEvent> finishedEvents = 
clearCurrentSchemaChangeRequest();
-            return CompletableFuture.supplyAsync(
-                    () -> wrap(new 
SchemaChangeResultResponse(finishedEvents)));
+            SchemaChangeResultResponse resultResponse =
+                    new SchemaChangeResultResponse(finishedEvents);
+            response.complete(wrap(resultResponse));
         } else {
             // Still working on schema change request, waiting it
-            return CompletableFuture.supplyAsync(() -> wrap(new 
SchemaChangeProcessingResponse()));
+            response.complete(wrap(new SchemaChangeProcessingResponse()));
         }
     }
 
@@ -459,7 +468,8 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
 
     private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
         if (currentChangeException != null) {
-            throw new RuntimeException("Failed to apply schema change.", 
currentChangeException);
+            context.failJob(
+                    new RuntimeException("Failed to apply schema change.", 
currentChangeException));
         }
         List<SchemaChangeEvent> finishedSchemaChanges =
                 new ArrayList<>(currentFinishedSchemaChanges);
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
index 51acbd536..e1f6a94c9 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
 import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
 import org.apache.flink.cdc.common.event.TableId;
+import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
@@ -1039,11 +1040,16 @@ public class SchemaEvolveTest {
                                         new AddColumnEvent.ColumnWithPosition(
                                                 Column.physicalColumn(
                                                         "height", DOUBLE, 
"Height data")))));
-        Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, 
addColumnEvents))
+        processEvent(schemaOperator, addColumnEvents);
+        Assertions.assertThat(harness.isJobFailed()).isEqualTo(true);
+        Assertions.assertThat(harness.getJobFailureCause())
                 .cause()
-                .cause()
-                .isExactlyInstanceOf(RuntimeException.class)
-                .hasMessageContaining("Failed to apply schema change");
+                
.isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
+                .matches(
+                        e ->
+                                ((UnsupportedSchemaChangeEventException) e)
+                                        .getExceptionMessage()
+                                        .equals("Sink doesn't support such 
schema change event."));
         harness.close();
     }
 
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index a469a3a91..d8b977029 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.cdc.runtime.testutils.schema.TestingSchemaRegistryGatewa
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
-import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -56,6 +55,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.unwrap;
 
@@ -81,6 +81,7 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
     private final SchemaRegistry schemaRegistry;
     private final TestingSchemaRegistryGateway schemaRegistryGateway;
     private final LinkedList<StreamRecord<E>> outputRecords = new 
LinkedList<>();
+    private final MockedOperatorCoordinatorContext mockedContext;
 
     public EventOperatorTestHarness(OP operator, int numOutputs) {
         this(operator, numOutputs, null, SchemaChangeBehavior.EVOLVE);
@@ -94,11 +95,14 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
             OP operator, int numOutputs, Duration duration, 
SchemaChangeBehavior behavior) {
         this.operator = operator;
         this.numOutputs = numOutputs;
+        this.mockedContext =
+                new MockedOperatorCoordinatorContext(
+                        SCHEMA_OPERATOR_ID, 
Thread.currentThread().getContextClassLoader());
         schemaRegistry =
                 new SchemaRegistry(
                         "SchemaOperator",
-                        new MockOperatorCoordinatorContext(
-                                SCHEMA_OPERATOR_ID, 
Thread.currentThread().getContextClassLoader()),
+                        mockedContext,
+                        Executors.newFixedThreadPool(1),
                         new CollectingMetadataApplier(duration),
                         new ArrayList<>(),
                         behavior);
@@ -113,11 +117,14 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
             Set<SchemaChangeEventType> enabledEventTypes) {
         this.operator = operator;
         this.numOutputs = numOutputs;
+        this.mockedContext =
+                new MockedOperatorCoordinatorContext(
+                        SCHEMA_OPERATOR_ID, 
Thread.currentThread().getContextClassLoader());
         schemaRegistry =
                 new SchemaRegistry(
                         "SchemaOperator",
-                        new MockOperatorCoordinatorContext(
-                                SCHEMA_OPERATOR_ID, 
Thread.currentThread().getContextClassLoader()),
+                        mockedContext,
+                        Executors.newFixedThreadPool(1),
                         new CollectingMetadataApplier(duration, 
enabledEventTypes),
                         new ArrayList<>(),
                         behavior);
@@ -133,11 +140,14 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
             Set<SchemaChangeEventType> errorsOnEventTypes) {
         this.operator = operator;
         this.numOutputs = numOutputs;
+        this.mockedContext =
+                new MockedOperatorCoordinatorContext(
+                        SCHEMA_OPERATOR_ID, 
Thread.currentThread().getContextClassLoader());
         schemaRegistry =
                 new SchemaRegistry(
                         "SchemaOperator",
-                        new MockOperatorCoordinatorContext(
-                                SCHEMA_OPERATOR_ID, 
Thread.currentThread().getContextClassLoader()),
+                        mockedContext,
+                        Executors.newFixedThreadPool(1),
                         new CollectingMetadataApplier(
                                 duration, enabledEventTypes, 
errorsOnEventTypes),
                         new ArrayList<>(),
@@ -196,6 +206,14 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
                 .orElse(null);
     }
 
+    public boolean isJobFailed() {
+        return mockedContext.isJobFailed();
+    }
+
+    public Throwable getJobFailureCause() {
+        return mockedContext.getFailureCause();
+    }
+
     @Override
     public void close() throws Exception {
         operator.close();
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java
new file mode 100644
index 000000000..19ab961ee
--- /dev/null
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.testutils.operators;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+
+/**
+ * This is a mocked version of Operator coordinator context that stores 
failure cause for testing
+ * purposes only.
+ */
+public class MockedOperatorCoordinatorContext extends 
MockOperatorCoordinatorContext {
+    public MockedOperatorCoordinatorContext(
+            OperatorID operatorID, ClassLoader userCodeClassLoader) {
+        super(operatorID, userCodeClassLoader);
+    }
+
+    private Throwable failureCause;
+
+    @Override
+    public void failJob(Throwable cause) {
+        super.failJob(cause);
+        failureCause = cause;
+    }
+
+    public Throwable getFailureCause() {
+        return failureCause;
+    }
+}


Reply via email to