This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 5a80757443ecc816ff6db0cb7618aaa4d4bce9b7
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Aug 22 09:58:55 2024 +0800

    [FLINK-36094][cdc-runtime] Improve the Exception that 
SchemaRegistryRequestHandler thrown
    
     This closes #3558.
    
    (cherry picked from commit 6205a5a0f16d2cf72ff751573351c4e15ea59efb)
---
 .../common/exceptions/SchemaEvolveException.java   |   6 +-
 .../UnsupportedSchemaChangeEventException.java     |  28 +++++-
 .../cdc/connectors/values/ValuesDatabase.java      |   3 +-
 .../flink/cdc/pipeline/tests/MysqlE2eITCase.java   |   2 +-
 .../cdc/pipeline/tests/SchemaEvolveE2eITCase.java  |  23 +++--
 .../cdc/pipeline/tests/TransformE2eITCase.java     |   2 +-
 .../flink/cdc/pipeline/tests/UdfE2eITCase.java     |   2 +-
 .../runtime/operators/schema/SchemaOperator.java   |  65 +-----------
 .../schema/coordinator/SchemaRegistry.java         | 110 +++++++++++----------
 .../coordinator/SchemaRegistryRequestHandler.java  |  56 +++++++----
 .../event/ApplyEvolvedSchemaChangeRequest.java     |  73 --------------
 .../event/ApplyEvolvedSchemaChangeResponse.java    |  32 ------
 .../event/ApplyOriginalSchemaChangeRequest.java    |  71 -------------
 .../event/ApplyOriginalSchemaChangeResponse.java   |  31 ------
 .../schema/event/ReleaseUpstreamResponse.java      |  42 +-------
 .../runtime/operators/schema/SchemaEvolveTest.java |   2 +
 .../operators/EventOperatorTestHarness.java        |  11 +--
 .../schema/CollectingMetadataApplier.java          |   4 +-
 18 files changed, 149 insertions(+), 414 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
index fc5b482f1..52950fb32 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
@@ -24,9 +24,9 @@ import javax.annotation.Nullable;
 
 /** An exception occurred during schema evolution. */
 public class SchemaEvolveException extends FlinkRuntimeException {
-    private final SchemaChangeEvent applyingEvent;
-    private final String exceptionMessage;
-    private final @Nullable Throwable cause;
+    protected final SchemaChangeEvent applyingEvent;
+    protected final String exceptionMessage;
+    protected final @Nullable Throwable cause;
 
     public SchemaEvolveException(SchemaChangeEvent applyingEvent, String 
exceptionMessage) {
         this(applyingEvent, exceptionMessage, null);
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java
index 4be20525a..55d53e7d9 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java
@@ -19,10 +19,36 @@ package org.apache.flink.cdc.common.exceptions;
 
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 
+import javax.annotation.Nullable;
+
 /** A special kind of {@link SchemaEvolveException} that sink doesn't support 
such event type. */
 public class UnsupportedSchemaChangeEventException extends 
SchemaEvolveException {
 
     public UnsupportedSchemaChangeEventException(SchemaChangeEvent 
applyingEvent) {
-        super(applyingEvent, "Sink doesn't support such schema change event.", 
null);
+        this(applyingEvent, "Sink doesn't support such schema change event.");
+    }
+
+    public UnsupportedSchemaChangeEventException(
+            SchemaChangeEvent applyingEvent, String exceptionMessage) {
+        this(applyingEvent, exceptionMessage, null);
+    }
+
+    public UnsupportedSchemaChangeEventException(
+            SchemaChangeEvent applyingEvent, String exceptionMessage, 
@Nullable Throwable cause) {
+        super(applyingEvent, exceptionMessage, cause);
+    }
+
+    @Override
+    public String toString() {
+        return "UnsupportedSchemaChangeEventException{"
+                + "applyingEvent="
+                + applyingEvent
+                + ", exceptionMessage='"
+                + exceptionMessage
+                + '\''
+                + ", cause='"
+                + cause
+                + '\''
+                + '}';
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
index e19c4a844..bbe36a25a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
@@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -150,7 +151,7 @@ public class ValuesDatabase {
                                     tableId, ((CreateTableEvent) 
schemaChangeEvent).getSchema()));
                 }
             } else {
-                throw new SchemaEvolveException(
+                throw new UnsupportedSchemaChangeEventException(
                         schemaChangeEvent,
                         "Rejected schema change event since 
error.on.schema.change is enabled.",
                         null);
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index ada384e07..917873bb8 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -252,7 +252,7 @@ public class MysqlE2eITCase extends PipelineTestEnvironment 
{
         long endTimeout = System.currentTimeMillis() + 
MysqlE2eITCase.EVENT_WAITING_TIMEOUT;
         while (System.currentTimeMillis() < endTimeout) {
             String stdout = taskManagerConsumer.toUtf8String();
-            if (stdout.contains(event)) {
+            if (stdout.contains(event + "\n")) {
                 result = true;
                 break;
             }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
index d187853da..c84a8824f 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
@@ -110,9 +110,9 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
                 true,
                 false,
                 false,
-                Collections.singletonList(
-                        "java.lang.IllegalStateException: Incompatible types: 
\"INT\" and \"DOUBLE\""),
-                Collections.singletonList(
+                Collections.emptyList(),
+                Arrays.asList(
+                        "java.lang.IllegalStateException: Incompatible types: 
\"INT\" and \"DOUBLE\"",
                         "org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy"));
     }
 
@@ -123,11 +123,10 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
                 false,
                 true,
                 false,
-                Collections.singletonList(
-                        "AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}"),
+                Collections.emptyList(),
                 Arrays.asList(
-                        "Failed to apply schema change event 
AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}.",
-                        
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since 
error.on.schema.change is enabled.', cause='null'}",
+                        "Failed to apply schema change 
AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]} to table %s.members. Caused by: 
UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members,
 addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since 
error.on.schema.change is enabl [...]
+                        
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members,
 addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since 
error.on.schema.change is enabled.', cause='null'}",
                         "org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy"));
     }
 
@@ -143,8 +142,8 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1012, Eve, 17], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1013, Fiona, null], op=INSERT, meta=()}"),
                 Arrays.asList(
-                        "Failed to apply schema change 
AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]} to table %s.members.",
-                        
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since 
error.on.schema.change is enabled.', cause='null'}"));
+                        "Failed to apply schema change 
AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]} to table %s.members. Caused by: 
UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members,
 addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since 
error.on.schema.change is enabl [...]
+                        
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members,
 addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since 
error.on.schema.change is enabled.', cause='null'}"));
     }
 
     @Test
@@ -181,7 +180,7 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
                 false,
                 false,
                 Arrays.asList(
-                        "AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}",
+                        "AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, 
existedColumnName=null}]}",
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
                         "AlterColumnTypeEvent{tableId=%s.members, 
nameMapping={age=DOUBLE}}",
                         "AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, 
existedColumnName=null}]}",
@@ -348,7 +347,7 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
 
         List<String> expectedJmEvents =
                 expectedJobManagerEvents.stream()
-                        .map(s -> String.format(s, dbName, dbName))
+                        .map(s -> String.format(s, dbName, dbName, dbName))
                         .collect(Collectors.toList());
 
         validateResult(expectedJmEvents, jobManagerConsumer);
@@ -392,7 +391,7 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
         long endTimeout = System.currentTimeMillis() + 
SchemaEvolveE2eITCase.EVENT_WAITING_TIMEOUT;
         while (System.currentTimeMillis() < endTimeout) {
             String stdout = consumer.toUtf8String();
-            if (stdout.contains(event)) {
+            if (stdout.contains(event + "\n")) {
                 result = true;
                 break;
             }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index e27977153..6f612ca95 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -736,7 +736,7 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
         long endTimeout = System.currentTimeMillis() + timeout;
         while (System.currentTimeMillis() < endTimeout) {
             String stdout = taskManagerConsumer.toUtf8String();
-            if (stdout.contains(event)) {
+            if (stdout.contains(event + "\n")) {
                 result = true;
                 break;
             }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
index c965e4dec..938e2d98e 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
@@ -386,7 +386,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
         long endTimeout = System.currentTimeMillis() + timeout;
         while (System.currentTimeMillis() < endTimeout) {
             String stdout = taskManagerConsumer.toUtf8String();
-            if (stdout.contains(event)) {
+            if (stdout.contains(event + "\n")) {
                 result = true;
                 break;
             }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index d3a9e158b..a700fd39c 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.cdc.runtime.operators.schema;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
@@ -29,7 +28,6 @@ import org.apache.flink.cdc.common.event.FlushEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
 import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.schema.Column;
@@ -40,8 +38,6 @@ import org.apache.flink.cdc.common.types.DataTypeFamily;
 import org.apache.flink.cdc.common.types.DataTypeRoot;
 import org.apache.flink.cdc.common.utils.ChangeEventUtils;
 import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
@@ -431,56 +427,9 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
             ReleaseUpstreamResponse schemaEvolveResponse = 
requestReleaseUpstream();
             List<SchemaChangeEvent> finishedSchemaChangeEvents =
                     schemaEvolveResponse.getFinishedSchemaChangeEvents();
-            List<Tuple2<SchemaChangeEvent, Throwable>> 
failedSchemaChangeEvents =
-                    schemaEvolveResponse.getFailedSchemaChangeEvents();
-            List<SchemaChangeEvent> ignoredSchemaChangeEvents =
-                    schemaEvolveResponse.getIgnoredSchemaChangeEvents();
-
-            if (schemaChangeBehavior == SchemaChangeBehavior.EVOLVE
-                    || schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) 
{
-                if (schemaEvolveResponse.hasException()) {
-                    throw new RuntimeException(
-                            String.format(
-                                    "Failed to apply schema change event 
%s.\nExceptions: %s",
-                                    schemaChangeEvent,
-                                    
schemaEvolveResponse.getPrintableFailedSchemaChangeEvents()));
-                }
-            } else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE
-                    || schemaChangeBehavior == SchemaChangeBehavior.LENIENT
-                    || schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
-                if (schemaEvolveResponse.hasException()) {
-                    schemaEvolveResponse
-                            .getFailedSchemaChangeEvents()
-                            .forEach(
-                                    e ->
-                                            LOG.warn(
-                                                    "Failed to apply event {}, 
but keeps running in tolerant mode. Caused by: {}",
-                                                    e.f0,
-                                                    e.f1));
-                }
-            } else {
-                throw new SchemaEvolveException(
-                        schemaChangeEvent,
-                        "Unexpected schema change behavior: " + 
schemaChangeBehavior);
-            }
 
             // Update evolved schema changes based on apply results
-            requestApplyEvolvedSchemaChanges(tableId, 
finishedSchemaChangeEvents);
             finishedSchemaChangeEvents.forEach(e -> output.collect(new 
StreamRecord<>(e)));
-
-            LOG.info(
-                    "Applied schema change event {} to downstream. Among {} 
total evolved events, {} succeeded, {} failed, and {} ignored.",
-                    schemaChangeEvent,
-                    expectedSchemaChangeEvents.size(),
-                    finishedSchemaChangeEvents.size(),
-                    failedSchemaChangeEvents.size(),
-                    ignoredSchemaChangeEvents.size());
-
-            schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(
-                    finishedSchemaChangeEvents.size());
-            
schemaOperatorMetrics.increaseFailedSchemaChangeEvents(failedSchemaChangeEvents.size());
-            schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(
-                    ignoredSchemaChangeEvents.size());
         }
     }
 
@@ -489,16 +438,6 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         return sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
     }
 
-    private void requestApplyOriginalSchemaChanges(
-            TableId tableId, SchemaChangeEvent schemaChangeEvent) {
-        sendRequestToCoordinator(new ApplyOriginalSchemaChangeRequest(tableId, 
schemaChangeEvent));
-    }
-
-    private void requestApplyEvolvedSchemaChanges(
-            TableId tableId, List<SchemaChangeEvent> schemaChangeEvents) {
-        sendRequestToCoordinator(new ApplyEvolvedSchemaChangeRequest(tableId, 
schemaChangeEvents));
-    }
-
     private ReleaseUpstreamResponse requestReleaseUpstream()
             throws InterruptedException, TimeoutException {
         CoordinationResponse coordinationResponse =
@@ -538,7 +477,7 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
             return optionalSchema.get();
         } catch (Exception e) {
             throw new IllegalStateException(
-                    String.format("Unable to get latest schema for table 
\"%s\"", tableId));
+                    String.format("Unable to get latest schema for table 
\"%s\"", tableId), e);
         }
     }
 
@@ -553,7 +492,7 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
             return optionalSchema.get();
         } catch (Exception e) {
             throw new IllegalStateException(
-                    String.format("Unable to get latest schema for table 
\"%s\"", tableId));
+                    String.format("Unable to get latest schema for table 
\"%s\"", tableId), e);
         }
     }
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index cc7c68207..9087ae4b3 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -17,15 +17,13 @@
 
 package org.apache.flink.cdc.runtime.operators.schema.coordinator;
 
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeResponse;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeResponse;
 import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
@@ -151,18 +149,23 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
     @Override
     public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event)
             throws Exception {
-        if (event instanceof FlushSuccessEvent) {
-            FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
-            LOG.info(
-                    "Sink subtask {} succeed flushing for table {}.",
-                    flushSuccessEvent.getSubtask(),
-                    flushSuccessEvent.getTableId().toString());
-            requestHandler.flushSuccess(
-                    flushSuccessEvent.getTableId(), 
flushSuccessEvent.getSubtask());
-        } else if (event instanceof SinkWriterRegisterEvent) {
-            requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) 
event).getSubtask());
-        } else {
-            throw new FlinkException("Unrecognized Operator Event: " + event);
+        try {
+            if (event instanceof FlushSuccessEvent) {
+                FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) 
event;
+                LOG.info(
+                        "Sink subtask {} succeed flushing for table {}.",
+                        flushSuccessEvent.getSubtask(),
+                        flushSuccessEvent.getTableId().toString());
+                requestHandler.flushSuccess(
+                        flushSuccessEvent.getTableId(), 
flushSuccessEvent.getSubtask());
+            } else if (event instanceof SinkWriterRegisterEvent) {
+                requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) 
event).getSubtask());
+            } else {
+                throw new FlinkException("Unrecognized Operator Event: " + 
event);
+            }
+        } catch (Throwable t) {
+            context.failJob(t);
+            throw t;
         }
     }
 
@@ -180,6 +183,9 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
             // Serialize SchemaDerivation mapping
             SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
             resultFuture.complete(baos.toByteArray());
+        } catch (Throwable t) {
+            context.failJob(t);
+            throw t;
         }
     }
 
@@ -191,33 +197,29 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
     @Override
     public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
             CoordinationRequest request) {
-        if (request instanceof SchemaChangeRequest) {
-            SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) 
request;
-            return 
requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
-        } else if (request instanceof ReleaseUpstreamRequest) {
-            return requestHandler.handleReleaseUpstreamRequest();
-        } else if (request instanceof GetEvolvedSchemaRequest) {
-            return CompletableFuture.completedFuture(
-                    
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
-        } else if (request instanceof GetOriginalSchemaRequest) {
-            return CompletableFuture.completedFuture(
-                    
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
-        } else if (request instanceof ApplyOriginalSchemaChangeRequest) {
-            return CompletableFuture.completedFuture(
-                    wrap(
-                            handleApplyOriginalSchemaChangeRequest(
-                                    (ApplyOriginalSchemaChangeRequest) 
request)));
-        } else if (request instanceof ApplyEvolvedSchemaChangeRequest) {
-            return CompletableFuture.completedFuture(
-                    wrap(
-                            handleApplyEvolvedSchemaChangeRequest(
-                                    (ApplyEvolvedSchemaChangeRequest) 
request)));
-        } else if (request instanceof SchemaChangeResultRequest) {
-            return requestHandler.getSchemaChangeResult();
-        } else if (request instanceof RefreshPendingListsRequest) {
-            return requestHandler.refreshPendingLists();
-        } else {
-            throw new IllegalArgumentException("Unrecognized 
CoordinationRequest type: " + request);
+        try {
+            if (request instanceof SchemaChangeRequest) {
+                SchemaChangeRequest schemaChangeRequest = 
(SchemaChangeRequest) request;
+                return 
requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
+            } else if (request instanceof ReleaseUpstreamRequest) {
+                return requestHandler.handleReleaseUpstreamRequest();
+            } else if (request instanceof GetEvolvedSchemaRequest) {
+                return CompletableFuture.completedFuture(
+                        
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
+            } else if (request instanceof GetOriginalSchemaRequest) {
+                return CompletableFuture.completedFuture(
+                        
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
+            } else if (request instanceof SchemaChangeResultRequest) {
+                return requestHandler.getSchemaChangeResult();
+            } else if (request instanceof RefreshPendingListsRequest) {
+                return requestHandler.refreshPendingLists();
+            } else {
+                throw new IllegalArgumentException(
+                        "Unrecognized CoordinationRequest type: " + request);
+            }
+        } catch (Throwable t) {
+            context.failJob(t);
+            throw t;
         }
     }
 
@@ -275,6 +277,9 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
                     throw new IOException(
                             "Unrecognized serialization version " + 
schemaManagerSerializerVersion);
             }
+        } catch (Throwable t) {
+            context.failJob(t);
+            throw t;
         }
     }
 
@@ -342,18 +347,15 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
         }
     }
 
-    private ApplyOriginalSchemaChangeResponse 
handleApplyOriginalSchemaChangeRequest(
-            ApplyOriginalSchemaChangeRequest applyOriginalSchemaChangeRequest) 
{
-        schemaManager.applyOriginalSchemaChange(
-                applyOriginalSchemaChangeRequest.getSchemaChangeEvent());
-        return new ApplyOriginalSchemaChangeResponse();
+    // --------------------Only visible for test -----------------
+
+    @VisibleForTesting
+    public void handleApplyOriginalSchemaChangeEvent(SchemaChangeEvent 
schemaChangeEvent) {
+        schemaManager.applyOriginalSchemaChange(schemaChangeEvent);
     }
 
-    private ApplyEvolvedSchemaChangeResponse 
handleApplyEvolvedSchemaChangeRequest(
-            ApplyEvolvedSchemaChangeRequest applyEvolvedSchemaChangeRequest) {
-        applyEvolvedSchemaChangeRequest
-                .getSchemaChangeEvent()
-                .forEach(schemaManager::applyEvolvedSchemaChange);
-        return new ApplyEvolvedSchemaChangeResponse();
+    @VisibleForTesting
+    public void handleApplyEvolvedSchemaChangeRequest(SchemaChangeEvent 
schemaChangeEvent) {
+        schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index da88753e5..60280ce48 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.cdc.runtime.operators.schema.coordinator;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
@@ -27,7 +26,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
 import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
@@ -87,7 +86,6 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
     private final List<PendingSchemaChange> pendingSchemaChanges;
 
     private final List<SchemaChangeEvent> finishedSchemaChanges;
-    private final List<Tuple2<SchemaChangeEvent, Throwable>> 
failedSchemaChanges;
     private final List<SchemaChangeEvent> ignoredSchemaChanges;
 
     /** Sink writers which have sent flush success events for the request. */
@@ -95,6 +93,8 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
 
     /** Status of the execution of current schema change request. */
     private volatile boolean isSchemaChangeApplying;
+    /** Actual exception if failed to apply schema change. */
+    private volatile Throwable schemaChangeException;
     /** Executor service to execute schema change. */
     private final ExecutorService schemaChangeThreadPool;
 
@@ -110,7 +110,6 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         this.flushedSinkWriters = new HashSet<>();
         this.pendingSchemaChanges = new LinkedList<>();
         this.finishedSchemaChanges = new LinkedList<>();
-        this.failedSchemaChanges = new LinkedList<>();
         this.ignoredSchemaChanges = new LinkedList<>();
         this.schemaManager = schemaManager;
         this.schemaDerivation = schemaDerivation;
@@ -128,8 +127,8 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
     private void applySchemaChange(
             TableId tableId, List<SchemaChangeEvent> 
derivedSchemaChangeEvents) {
         isSchemaChangeApplying = true;
+        schemaChangeException = null;
         finishedSchemaChanges.clear();
-        failedSchemaChanges.clear();
         ignoredSchemaChanges.clear();
 
         for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
@@ -146,17 +145,27 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                 try {
                     metadataApplier.applySchemaChange(changeEvent);
                     LOG.debug("Applied schema change {} to table {}.", 
changeEvent, tableId);
+                    schemaManager.applyEvolvedSchemaChange(changeEvent);
                     finishedSchemaChanges.add(changeEvent);
-                } catch (SchemaEvolveException e) {
+                } catch (Throwable t) {
                     LOG.error(
                             "Failed to apply schema change {} to table {}. 
Caused by: {}",
                             changeEvent,
                             tableId,
-                            e);
-                    failedSchemaChanges.add(Tuple2.of(changeEvent, e));
+                            t);
+                    if (!shouldIgnoreException(t)) {
+                        schemaChangeException = t;
+                        break;
+                    } else {
+                        LOG.warn(
+                                "Failed to apply event {}, but keeps running 
in tolerant mode. Caused by: {}",
+                                changeEvent,
+                                t);
+                    }
                 }
             }
         }
+
         PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
         if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
             startNextSchemaChangeRequest();
@@ -241,6 +250,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                     () -> applySchemaChange(tableId, 
waitFlushSuccess.derivedSchemaChangeEvents));
             Thread.sleep(1000);
 
+            if (schemaChangeException != null) {
+                throw new RuntimeException("Failed to apply schema change.", 
schemaChangeException);
+            }
+
             if (isSchemaChangeApplying) {
                 waitFlushSuccess
                         .getResponseFuture()
@@ -248,12 +261,7 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
             } else {
                 waitFlushSuccess
                         .getResponseFuture()
-                        .complete(
-                                wrap(
-                                        new ReleaseUpstreamResponse(
-                                                finishedSchemaChanges,
-                                                failedSchemaChanges,
-                                                ignoredSchemaChanges)));
+                        .complete(wrap(new 
ReleaseUpstreamResponse(finishedSchemaChanges)));
             }
         }
     }
@@ -292,16 +300,15 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
     }
 
     public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
+        if (schemaChangeException != null) {
+            throw new RuntimeException("Failed to apply schema change.", 
schemaChangeException);
+        }
+
         if (isSchemaChangeApplying) {
             return CompletableFuture.supplyAsync(() -> wrap(new 
SchemaChangeProcessingResponse()));
         } else {
             return CompletableFuture.supplyAsync(
-                    () ->
-                            wrap(
-                                    new ReleaseUpstreamResponse(
-                                            finishedSchemaChanges,
-                                            failedSchemaChanges,
-                                            ignoredSchemaChanges)));
+                    () -> wrap(new 
ReleaseUpstreamResponse(finishedSchemaChanges)));
         }
     }
 
@@ -420,6 +427,15 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         }
     }
 
+    private boolean shouldIgnoreException(Throwable throwable) {
+
+        // In IGNORE mode, will never try to apply schema change events
+        // In EVOLVE and and LENIENT mode, such failure will not be tolerated
+        // In EXCEPTION mode, an exception will be thrown once captured
+        return (throwable instanceof UnsupportedSchemaChangeEventException)
+                && (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE);
+    }
+
     private static class PendingSchemaChange {
         private final SchemaChangeRequest changeRequest;
         private List<SchemaChangeEvent> derivedSchemaChangeEvents;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java
deleted file mode 100644
index f6798af51..000000000
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- * The request from {@link SchemaOperator} to {@link SchemaRegistry} to 
request apply evolved schema
- * changes.
- */
-public class ApplyEvolvedSchemaChangeRequest implements CoordinationRequest {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The sender of the request. */
-    private final TableId tableId;
-    /** The schema changes. */
-    private final List<SchemaChangeEvent> schemaChangeEvent;
-
-    public ApplyEvolvedSchemaChangeRequest(
-            TableId tableId, List<SchemaChangeEvent> schemaChangeEvent) {
-        this.tableId = tableId;
-        this.schemaChangeEvent = schemaChangeEvent;
-    }
-
-    public TableId getTableId() {
-        return tableId;
-    }
-
-    public List<SchemaChangeEvent> getSchemaChangeEvent() {
-        return schemaChangeEvent;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof ApplyEvolvedSchemaChangeRequest)) {
-            return false;
-        }
-        ApplyEvolvedSchemaChangeRequest that = 
(ApplyEvolvedSchemaChangeRequest) o;
-        return Objects.equals(tableId, that.tableId)
-                && Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(tableId, schemaChangeEvent);
-    }
-}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java
deleted file mode 100644
index 787adfc5e..000000000
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
-
-/**
- * The response from {@link SchemaRegistry} to {@link SchemaOperator} to 
request apply original
- * schema changes, the evolved schema changes come from original schema 
changes with different
- * schema evolution strategy.
- */
-public class ApplyEvolvedSchemaChangeResponse implements CoordinationResponse {
-
-    private static final long serialVersionUID = 1L;
-}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java
deleted file mode 100644
index d4c5d7fee..000000000
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
-
-import java.util.Objects;
-
-/**
- * The request from {@link SchemaOperator} to {@link SchemaRegistry} to 
request apply original
- * schema changes.
- */
-public class ApplyOriginalSchemaChangeRequest implements CoordinationRequest {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The sender of the request. */
-    private final TableId tableId;
-    /** The schema changes. */
-    private final SchemaChangeEvent schemaChangeEvent;
-
-    public ApplyOriginalSchemaChangeRequest(TableId tableId, SchemaChangeEvent 
schemaChangeEvent) {
-        this.tableId = tableId;
-        this.schemaChangeEvent = schemaChangeEvent;
-    }
-
-    public TableId getTableId() {
-        return tableId;
-    }
-
-    public SchemaChangeEvent getSchemaChangeEvent() {
-        return schemaChangeEvent;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof ApplyOriginalSchemaChangeRequest)) {
-            return false;
-        }
-        ApplyOriginalSchemaChangeRequest that = 
(ApplyOriginalSchemaChangeRequest) o;
-        return Objects.equals(tableId, that.tableId)
-                && Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(tableId, schemaChangeEvent);
-    }
-}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java
deleted file mode 100644
index 0a92e9656..000000000
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
-
-/**
- * The response from {@link SchemaRegistry} to {@link SchemaOperator} to 
request apply original
- * schema changes.
- */
-public class ApplyOriginalSchemaChangeResponse implements CoordinationResponse 
{
-
-    private static final long serialVersionUID = 1L;
-}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
index f577f1120..bea880962 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.cdc.runtime.operators.schema.event;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
 import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
@@ -25,7 +24,6 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry} 
to {@link
@@ -41,50 +39,19 @@ public class ReleaseUpstreamResponse implements 
CoordinationResponse {
      */
     private final List<SchemaChangeEvent> finishedSchemaChangeEvents;
 
-    private final List<Tuple2<SchemaChangeEvent, Throwable>> 
failedSchemaChangeEvents;
-
-    private final List<SchemaChangeEvent> ignoredSchemaChangeEvents;
-
-    public ReleaseUpstreamResponse(
-            List<SchemaChangeEvent> finishedSchemaChangeEvents,
-            List<Tuple2<SchemaChangeEvent, Throwable>> 
failedSchemaChangeEvents,
-            List<SchemaChangeEvent> ignoredSchemaChangeEvents) {
+    public ReleaseUpstreamResponse(List<SchemaChangeEvent> 
finishedSchemaChangeEvents) {
         this.finishedSchemaChangeEvents = finishedSchemaChangeEvents;
-        this.failedSchemaChangeEvents = failedSchemaChangeEvents;
-        this.ignoredSchemaChangeEvents = ignoredSchemaChangeEvents;
     }
 
     public List<SchemaChangeEvent> getFinishedSchemaChangeEvents() {
         return finishedSchemaChangeEvents;
     }
 
-    public List<Tuple2<SchemaChangeEvent, Throwable>> 
getFailedSchemaChangeEvents() {
-        return failedSchemaChangeEvents;
-    }
-
-    public List<SchemaChangeEvent> getIgnoredSchemaChangeEvents() {
-        return ignoredSchemaChangeEvents;
-    }
-
-    public String getPrintableFailedSchemaChangeEvents() {
-        return failedSchemaChangeEvents.stream()
-                .map(e -> "Failed to apply " + e.f0 + ". Caused by: " + e.f1)
-                .collect(Collectors.joining("\n"));
-    }
-
-    public boolean hasException() {
-        return !failedSchemaChangeEvents.isEmpty();
-    }
-
     @Override
     public String toString() {
         return "ReleaseUpstreamResponse{"
                 + "finishedSchemaChangeEvents="
                 + finishedSchemaChangeEvents
-                + ", failedSchemaChangeEvents="
-                + failedSchemaChangeEvents
-                + ", ignoredSchemaChangeEvents="
-                + ignoredSchemaChangeEvents
                 + '}';
     }
 
@@ -97,14 +64,11 @@ public class ReleaseUpstreamResponse implements 
CoordinationResponse {
             return false;
         }
         ReleaseUpstreamResponse that = (ReleaseUpstreamResponse) object;
-        return Objects.equals(finishedSchemaChangeEvents, 
that.finishedSchemaChangeEvents)
-                && Objects.equals(failedSchemaChangeEvents, 
that.failedSchemaChangeEvents)
-                && Objects.equals(ignoredSchemaChangeEvents, 
that.ignoredSchemaChangeEvents);
+        return Objects.equals(finishedSchemaChangeEvents, 
that.finishedSchemaChangeEvents);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(
-                finishedSchemaChangeEvents, failedSchemaChangeEvents, 
ignoredSchemaChangeEvents);
+        return Objects.hash(finishedSchemaChangeEvents);
     }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
index bf61ad260..d3e0de8d0 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
@@ -1036,6 +1036,8 @@ public class SchemaEvolveTest {
                                                 Column.physicalColumn(
                                                         "height", DOUBLE, 
"Height data")))));
         Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, 
addColumnEvents))
+                .cause()
+                .cause()
                 .isExactlyInstanceOf(RuntimeException.class)
                 .hasMessageContaining("Failed to apply schema change");
         harness.close();
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index b5262d2d5..354304599 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -25,8 +25,6 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Schema;
 import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
 import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
@@ -56,7 +54,6 @@ import org.apache.flink.util.SerializedValue;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Set;
 
@@ -166,14 +163,10 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
     }
 
     public void registerTableSchema(TableId tableId, Schema schema) {
-        schemaRegistry.handleCoordinationRequest(
-                new ApplyOriginalSchemaChangeRequest(
-                        tableId, new CreateTableEvent(tableId, schema)));
+        schemaRegistry.handleApplyOriginalSchemaChangeEvent(new 
CreateTableEvent(tableId, schema));
         schemaRegistry.handleCoordinationRequest(
                 new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, 
schema)));
-        schemaRegistry.handleCoordinationRequest(
-                new ApplyEvolvedSchemaChangeRequest(
-                        tableId, Collections.singletonList(new 
CreateTableEvent(tableId, schema))));
+        schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new 
CreateTableEvent(tableId, schema));
     }
 
     public Schema getLatestOriginalSchema(TableId tableId) throws Exception {
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
index 9398e1f1e..9384fa757 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
@@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
 import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
 import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 
 import java.time.Duration;
@@ -83,8 +84,7 @@ public class CollectingMetadataApplier implements 
MetadataApplier {
             try {
                 Thread.sleep(duration.toMillis());
                 if (errorsOnEventTypes.contains(schemaChangeEvent.getType())) {
-                    throw new SchemaEvolveException(
-                            schemaChangeEvent, "Dummy metadata apply exception 
for test.", null);
+                    throw new 
UnsupportedSchemaChangeEventException(schemaChangeEvent);
                 }
             } catch (InterruptedException ignore) {
                 // Ignores sleep interruption

Reply via email to