This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 6205a5a0f [FLINK-36094][cdc-runtime] Improve the Exception that
SchemaRegistryRequestHandler thrown
6205a5a0f is described below
commit 6205a5a0f16d2cf72ff751573351c4e15ea59efb
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Aug 22 09:58:55 2024 +0800
[FLINK-36094][cdc-runtime] Improve the Exception that
SchemaRegistryRequestHandler thrown
This closes #3558.
---
.../common/exceptions/SchemaEvolveException.java | 6 +-
.../UnsupportedSchemaChangeEventException.java | 28 +++++-
.../doris/sink/DorisMetadataApplier.java | 8 +-
.../cdc/connectors/values/ValuesDatabase.java | 3 +-
.../flink/cdc/pipeline/tests/MysqlE2eITCase.java | 2 +-
.../flink/cdc/pipeline/tests/RouteE2eITCase.java | 4 +-
.../cdc/pipeline/tests/SchemaEvolveE2eITCase.java | 23 +++--
.../cdc/pipeline/tests/TransformE2eITCase.java | 2 +-
.../flink/cdc/pipeline/tests/UdfE2eITCase.java | 2 +-
.../runtime/operators/schema/SchemaOperator.java | 65 +-----------
.../schema/coordinator/SchemaRegistry.java | 110 +++++++++++----------
.../coordinator/SchemaRegistryRequestHandler.java | 56 +++++++----
.../event/ApplyEvolvedSchemaChangeRequest.java | 73 --------------
.../event/ApplyEvolvedSchemaChangeResponse.java | 32 ------
.../event/ApplyOriginalSchemaChangeRequest.java | 71 -------------
.../event/ApplyOriginalSchemaChangeResponse.java | 31 ------
.../schema/event/ReleaseUpstreamResponse.java | 42 +-------
.../runtime/operators/schema/SchemaEvolveTest.java | 2 +
.../operators/EventOperatorTestHarness.java | 11 +--
.../schema/CollectingMetadataApplier.java | 4 +-
20 files changed, 155 insertions(+), 420 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
index fc5b482f1..52950fb32 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
@@ -24,9 +24,9 @@ import javax.annotation.Nullable;
/** An exception occurred during schema evolution. */
public class SchemaEvolveException extends FlinkRuntimeException {
- private final SchemaChangeEvent applyingEvent;
- private final String exceptionMessage;
- private final @Nullable Throwable cause;
+ protected final SchemaChangeEvent applyingEvent;
+ protected final String exceptionMessage;
+ protected final @Nullable Throwable cause;
public SchemaEvolveException(SchemaChangeEvent applyingEvent, String
exceptionMessage) {
this(applyingEvent, exceptionMessage, null);
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java
index 4be20525a..55d53e7d9 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java
@@ -19,10 +19,36 @@ package org.apache.flink.cdc.common.exceptions;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import javax.annotation.Nullable;
+
/** A special kind of {@link SchemaEvolveException} that sink doesn't support
such event type. */
public class UnsupportedSchemaChangeEventException extends
SchemaEvolveException {
public UnsupportedSchemaChangeEventException(SchemaChangeEvent
applyingEvent) {
- super(applyingEvent, "Sink doesn't support such schema change event.",
null);
+ this(applyingEvent, "Sink doesn't support such schema change event.");
+ }
+
+ public UnsupportedSchemaChangeEventException(
+ SchemaChangeEvent applyingEvent, String exceptionMessage) {
+ this(applyingEvent, exceptionMessage, null);
+ }
+
+ public UnsupportedSchemaChangeEventException(
+ SchemaChangeEvent applyingEvent, String exceptionMessage,
@Nullable Throwable cause) {
+ super(applyingEvent, exceptionMessage, cause);
+ }
+
+ @Override
+ public String toString() {
+ return "UnsupportedSchemaChangeEventException{"
+ + "applyingEvent="
+ + applyingEvent
+ + ", exceptionMessage='"
+ + exceptionMessage
+ + '\''
+ + ", cause='"
+ + cause
+ + '\''
+ + '}';
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index 5ee181985..cad3b37fc 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -221,7 +221,7 @@ public class DorisMetadataApplier implements
MetadataApplier {
tableId.getSchemaName(), tableId.getTableName(),
addFieldSchema);
}
} catch (Exception e) {
- throw new SchemaEvolveException(event, e.getMessage(), e);
+ throw new SchemaEvolveException(event, "fail to apply add column
event", e);
}
}
@@ -234,7 +234,7 @@ public class DorisMetadataApplier implements
MetadataApplier {
tableId.getSchemaName(), tableId.getTableName(), col);
}
} catch (Exception e) {
- throw new SchemaEvolveException(event, e.getMessage(), e);
+ throw new SchemaEvolveException(event, "fail to apply drop column
event", e);
}
}
@@ -250,7 +250,7 @@ public class DorisMetadataApplier implements
MetadataApplier {
entry.getValue());
}
} catch (Exception e) {
- throw new SchemaEvolveException(event, e.getMessage(), e);
+ throw new SchemaEvolveException(event, "fail to apply rename
column event", e);
}
}
@@ -272,7 +272,7 @@ public class DorisMetadataApplier implements
MetadataApplier {
// will be fixed after FLINK-35243 got merged.
}
} catch (Exception e) {
- throw new SchemaEvolveException(event, e.getMessage(), e);
+ throw new SchemaEvolveException(event, "fail to apply alter column
type event", e);
}
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
index dfea5d071..0004961f3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
@@ -31,6 +31,7 @@ import
org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -152,7 +153,7 @@ public class ValuesDatabase {
tableId, ((CreateTableEvent)
schemaChangeEvent).getSchema()));
}
} else {
- throw new SchemaEvolveException(
+ throw new UnsupportedSchemaChangeEventException(
schemaChangeEvent,
"Rejected schema change event since
error.on.schema.change is enabled.",
null);
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 3ce48bce2..e340468f3 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -341,7 +341,7 @@ public class MysqlE2eITCase extends PipelineTestEnvironment
{
long endTimeout = System.currentTimeMillis() +
MysqlE2eITCase.EVENT_WAITING_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
- if (stdout.contains(event)) {
+ if (stdout.contains(event + "\n")) {
result = true;
break;
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
index 8b3c907b8..0834ce6e7 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
@@ -800,7 +800,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment
{
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[],
after=[10001, 12, Derrida], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA,
nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[],
after=[10002, 15], op=INSERT, meta=()}",
- "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA,
typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}",
+ "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA,
typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}",
"RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA,
nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[],
after=[10003, Fluorite], op=INSERT, meta=()}",
"DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA,
droppedColumnNames=[VERSION]}",
@@ -818,7 +818,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment
{
long endTimeout = System.currentTimeMillis() + EVENT_DEFAULT_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
- if (stdout.contains(event)) {
+ if (stdout.contains(event + "\n")) {
result = true;
break;
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
index 8225558fc..14c6a5912 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
@@ -113,9 +113,9 @@ public class SchemaEvolveE2eITCase extends
PipelineTestEnvironment {
true,
false,
false,
- Collections.singletonList(
- "java.lang.IllegalStateException: Incompatible types:
\"INT\" and \"DOUBLE\""),
- Collections.singletonList(
+ Collections.emptyList(),
+ Arrays.asList(
+ "java.lang.IllegalStateException: Incompatible types:
\"INT\" and \"DOUBLE\"",
"org.apache.flink.runtime.JobException: Recovery is
suppressed by NoRestartBackoffTimeStrategy"));
}
@@ -126,11 +126,10 @@ public class SchemaEvolveE2eITCase extends
PipelineTestEnvironment {
false,
true,
false,
- Collections.singletonList(
- "AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]}"),
+ Collections.emptyList(),
Arrays.asList(
- "Failed to apply schema change event
AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]}.",
-
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since
error.on.schema.change is enabled.', cause='null'}",
+ "Failed to apply schema change
AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]} to table %s.members. Caused by:
UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since
error.on.schema.change is enabl [...]
+
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since
error.on.schema.change is enabled.', cause='null'}",
"org.apache.flink.runtime.JobException: Recovery is
suppressed by NoRestartBackoffTimeStrategy"));
}
@@ -146,8 +145,8 @@ public class SchemaEvolveE2eITCase extends
PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.members, before=[],
after=[1012, Eve, 17], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[],
after=[1013, Fiona, null], op=INSERT, meta=()}"),
Arrays.asList(
- "Failed to apply schema change
AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]} to table %s.members.",
-
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since
error.on.schema.change is enabled.', cause='null'}"));
+ "Failed to apply schema change
AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]} to table %s.members. Caused by:
UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since
error.on.schema.change is enabl [...]
+
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]}, exceptionMessage='Rejected schema change event since
error.on.schema.change is enabled.', cause='null'}"));
}
@Test
@@ -185,7 +184,7 @@ public class SchemaEvolveE2eITCase extends
PipelineTestEnvironment {
false,
false,
Arrays.asList(
- "AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]}",
+ "AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST,
existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[],
after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members,
typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST,
existedColumnName=null}]}",
@@ -369,7 +368,7 @@ public class SchemaEvolveE2eITCase extends
PipelineTestEnvironment {
List<String> expectedJmEvents =
expectedJobManagerEvents.stream()
- .map(s -> String.format(s, dbName, dbName))
+ .map(s -> String.format(s, dbName, dbName, dbName))
.collect(Collectors.toList());
validateResult(expectedJmEvents, jobManagerConsumer);
@@ -422,7 +421,7 @@ public class SchemaEvolveE2eITCase extends
PipelineTestEnvironment {
long endTimeout = System.currentTimeMillis() +
SchemaEvolveE2eITCase.EVENT_WAITING_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = consumer.toUtf8String();
- if (stdout.contains(event)) {
+ if (stdout.contains(event + "\n")) {
result = true;
break;
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index e27977153..6f612ca95 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -736,7 +736,7 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
- if (stdout.contains(event)) {
+ if (stdout.contains(event + "\n")) {
result = true;
break;
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
index c965e4dec..938e2d98e 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
@@ -386,7 +386,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
- if (stdout.contains(event)) {
+ if (stdout.contains(event + "\n")) {
result = true;
break;
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index d3a9e158b..a700fd39c 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -17,7 +17,6 @@
package org.apache.flink.cdc.runtime.operators.schema;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
@@ -29,7 +28,6 @@ import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
@@ -40,8 +38,6 @@ import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
import
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
import
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
@@ -431,56 +427,9 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
ReleaseUpstreamResponse schemaEvolveResponse =
requestReleaseUpstream();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
- List<Tuple2<SchemaChangeEvent, Throwable>>
failedSchemaChangeEvents =
- schemaEvolveResponse.getFailedSchemaChangeEvents();
- List<SchemaChangeEvent> ignoredSchemaChangeEvents =
- schemaEvolveResponse.getIgnoredSchemaChangeEvents();
-
- if (schemaChangeBehavior == SchemaChangeBehavior.EVOLVE
- || schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION)
{
- if (schemaEvolveResponse.hasException()) {
- throw new RuntimeException(
- String.format(
- "Failed to apply schema change event
%s.\nExceptions: %s",
- schemaChangeEvent,
-
schemaEvolveResponse.getPrintableFailedSchemaChangeEvents()));
- }
- } else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE
- || schemaChangeBehavior == SchemaChangeBehavior.LENIENT
- || schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
- if (schemaEvolveResponse.hasException()) {
- schemaEvolveResponse
- .getFailedSchemaChangeEvents()
- .forEach(
- e ->
- LOG.warn(
- "Failed to apply event {},
but keeps running in tolerant mode. Caused by: {}",
- e.f0,
- e.f1));
- }
- } else {
- throw new SchemaEvolveException(
- schemaChangeEvent,
- "Unexpected schema change behavior: " +
schemaChangeBehavior);
- }
// Update evolved schema changes based on apply results
- requestApplyEvolvedSchemaChanges(tableId,
finishedSchemaChangeEvents);
finishedSchemaChangeEvents.forEach(e -> output.collect(new
StreamRecord<>(e)));
-
- LOG.info(
- "Applied schema change event {} to downstream. Among {}
total evolved events, {} succeeded, {} failed, and {} ignored.",
- schemaChangeEvent,
- expectedSchemaChangeEvents.size(),
- finishedSchemaChangeEvents.size(),
- failedSchemaChangeEvents.size(),
- ignoredSchemaChangeEvents.size());
-
- schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(
- finishedSchemaChangeEvents.size());
-
schemaOperatorMetrics.increaseFailedSchemaChangeEvents(failedSchemaChangeEvents.size());
- schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(
- ignoredSchemaChangeEvents.size());
}
}
@@ -489,16 +438,6 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
return sendRequestToCoordinator(new SchemaChangeRequest(tableId,
schemaChangeEvent));
}
- private void requestApplyOriginalSchemaChanges(
- TableId tableId, SchemaChangeEvent schemaChangeEvent) {
- sendRequestToCoordinator(new ApplyOriginalSchemaChangeRequest(tableId,
schemaChangeEvent));
- }
-
- private void requestApplyEvolvedSchemaChanges(
- TableId tableId, List<SchemaChangeEvent> schemaChangeEvents) {
- sendRequestToCoordinator(new ApplyEvolvedSchemaChangeRequest(tableId,
schemaChangeEvents));
- }
-
private ReleaseUpstreamResponse requestReleaseUpstream()
throws InterruptedException, TimeoutException {
CoordinationResponse coordinationResponse =
@@ -538,7 +477,7 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
- String.format("Unable to get latest schema for table
\"%s\"", tableId));
+ String.format("Unable to get latest schema for table
\"%s\"", tableId), e);
}
}
@@ -553,7 +492,7 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
- String.format("Unable to get latest schema for table
\"%s\"", tableId));
+ String.format("Unable to get latest schema for table
\"%s\"", tableId), e);
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index cc7c68207..9087ae4b3 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -17,15 +17,13 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeResponse;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest;
import
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
@@ -151,18 +149,23 @@ public class SchemaRegistry implements
OperatorCoordinator, CoordinationRequestH
@Override
public void handleEventFromOperator(int subtask, int attemptNumber,
OperatorEvent event)
throws Exception {
- if (event instanceof FlushSuccessEvent) {
- FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
- LOG.info(
- "Sink subtask {} succeed flushing for table {}.",
- flushSuccessEvent.getSubtask(),
- flushSuccessEvent.getTableId().toString());
- requestHandler.flushSuccess(
- flushSuccessEvent.getTableId(),
flushSuccessEvent.getSubtask());
- } else if (event instanceof SinkWriterRegisterEvent) {
- requestHandler.registerSinkWriter(((SinkWriterRegisterEvent)
event).getSubtask());
- } else {
- throw new FlinkException("Unrecognized Operator Event: " + event);
+ try {
+ if (event instanceof FlushSuccessEvent) {
+ FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent)
event;
+ LOG.info(
+ "Sink subtask {} succeed flushing for table {}.",
+ flushSuccessEvent.getSubtask(),
+ flushSuccessEvent.getTableId().toString());
+ requestHandler.flushSuccess(
+ flushSuccessEvent.getTableId(),
flushSuccessEvent.getSubtask());
+ } else if (event instanceof SinkWriterRegisterEvent) {
+ requestHandler.registerSinkWriter(((SinkWriterRegisterEvent)
event).getSubtask());
+ } else {
+ throw new FlinkException("Unrecognized Operator Event: " +
event);
+ }
+ } catch (Throwable t) {
+ context.failJob(t);
+ throw t;
}
}
@@ -180,6 +183,9 @@ public class SchemaRegistry implements OperatorCoordinator,
CoordinationRequestH
// Serialize SchemaDerivation mapping
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
resultFuture.complete(baos.toByteArray());
+ } catch (Throwable t) {
+ context.failJob(t);
+ throw t;
}
}
@@ -191,33 +197,29 @@ public class SchemaRegistry implements
OperatorCoordinator, CoordinationRequestH
@Override
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
CoordinationRequest request) {
- if (request instanceof SchemaChangeRequest) {
- SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest)
request;
- return
requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
- } else if (request instanceof ReleaseUpstreamRequest) {
- return requestHandler.handleReleaseUpstreamRequest();
- } else if (request instanceof GetEvolvedSchemaRequest) {
- return CompletableFuture.completedFuture(
-
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
- } else if (request instanceof GetOriginalSchemaRequest) {
- return CompletableFuture.completedFuture(
-
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
- } else if (request instanceof ApplyOriginalSchemaChangeRequest) {
- return CompletableFuture.completedFuture(
- wrap(
- handleApplyOriginalSchemaChangeRequest(
- (ApplyOriginalSchemaChangeRequest)
request)));
- } else if (request instanceof ApplyEvolvedSchemaChangeRequest) {
- return CompletableFuture.completedFuture(
- wrap(
- handleApplyEvolvedSchemaChangeRequest(
- (ApplyEvolvedSchemaChangeRequest)
request)));
- } else if (request instanceof SchemaChangeResultRequest) {
- return requestHandler.getSchemaChangeResult();
- } else if (request instanceof RefreshPendingListsRequest) {
- return requestHandler.refreshPendingLists();
- } else {
- throw new IllegalArgumentException("Unrecognized
CoordinationRequest type: " + request);
+ try {
+ if (request instanceof SchemaChangeRequest) {
+ SchemaChangeRequest schemaChangeRequest =
(SchemaChangeRequest) request;
+ return
requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
+ } else if (request instanceof ReleaseUpstreamRequest) {
+ return requestHandler.handleReleaseUpstreamRequest();
+ } else if (request instanceof GetEvolvedSchemaRequest) {
+ return CompletableFuture.completedFuture(
+
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
+ } else if (request instanceof GetOriginalSchemaRequest) {
+ return CompletableFuture.completedFuture(
+
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
+ } else if (request instanceof SchemaChangeResultRequest) {
+ return requestHandler.getSchemaChangeResult();
+ } else if (request instanceof RefreshPendingListsRequest) {
+ return requestHandler.refreshPendingLists();
+ } else {
+ throw new IllegalArgumentException(
+ "Unrecognized CoordinationRequest type: " + request);
+ }
+ } catch (Throwable t) {
+ context.failJob(t);
+ throw t;
}
}
@@ -275,6 +277,9 @@ public class SchemaRegistry implements OperatorCoordinator,
CoordinationRequestH
throw new IOException(
"Unrecognized serialization version " +
schemaManagerSerializerVersion);
}
+ } catch (Throwable t) {
+ context.failJob(t);
+ throw t;
}
}
@@ -342,18 +347,15 @@ public class SchemaRegistry implements
OperatorCoordinator, CoordinationRequestH
}
}
- private ApplyOriginalSchemaChangeResponse
handleApplyOriginalSchemaChangeRequest(
- ApplyOriginalSchemaChangeRequest applyOriginalSchemaChangeRequest)
{
- schemaManager.applyOriginalSchemaChange(
- applyOriginalSchemaChangeRequest.getSchemaChangeEvent());
- return new ApplyOriginalSchemaChangeResponse();
+ // --------------------Only visible for test -----------------
+
+ @VisibleForTesting
+ public void handleApplyOriginalSchemaChangeEvent(SchemaChangeEvent
schemaChangeEvent) {
+ schemaManager.applyOriginalSchemaChange(schemaChangeEvent);
}
- private ApplyEvolvedSchemaChangeResponse
handleApplyEvolvedSchemaChangeRequest(
- ApplyEvolvedSchemaChangeRequest applyEvolvedSchemaChangeRequest) {
- applyEvolvedSchemaChangeRequest
- .getSchemaChangeEvent()
- .forEach(schemaManager::applyEvolvedSchemaChange);
- return new ApplyEvolvedSchemaChangeResponse();
+ @VisibleForTesting
+ public void handleApplyEvolvedSchemaChangeRequest(SchemaChangeEvent
schemaChangeEvent) {
+ schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index c47d556f6..a84ee8d63 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -17,7 +17,6 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
@@ -28,7 +27,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
@@ -88,7 +87,6 @@ public class SchemaRegistryRequestHandler implements
Closeable {
private final List<PendingSchemaChange> pendingSchemaChanges;
private final List<SchemaChangeEvent> finishedSchemaChanges;
- private final List<Tuple2<SchemaChangeEvent, Throwable>>
failedSchemaChanges;
private final List<SchemaChangeEvent> ignoredSchemaChanges;
/** Sink writers which have sent flush success events for the request. */
@@ -96,6 +94,8 @@ public class SchemaRegistryRequestHandler implements
Closeable {
/** Status of the execution of current schema change request. */
private volatile boolean isSchemaChangeApplying;
+ /** Actual exception if failed to apply schema change. */
+ private volatile Throwable schemaChangeException;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;
@@ -111,7 +111,6 @@ public class SchemaRegistryRequestHandler implements
Closeable {
this.flushedSinkWriters = new HashSet<>();
this.pendingSchemaChanges = new LinkedList<>();
this.finishedSchemaChanges = new LinkedList<>();
- this.failedSchemaChanges = new LinkedList<>();
this.ignoredSchemaChanges = new LinkedList<>();
this.schemaManager = schemaManager;
this.schemaDerivation = schemaDerivation;
@@ -129,8 +128,8 @@ public class SchemaRegistryRequestHandler implements
Closeable {
private void applySchemaChange(
TableId tableId, List<SchemaChangeEvent>
derivedSchemaChangeEvents) {
isSchemaChangeApplying = true;
+ schemaChangeException = null;
finishedSchemaChanges.clear();
- failedSchemaChanges.clear();
ignoredSchemaChanges.clear();
for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
@@ -147,17 +146,27 @@ public class SchemaRegistryRequestHandler implements
Closeable {
try {
metadataApplier.applySchemaChange(changeEvent);
LOG.debug("Applied schema change {} to table {}.",
changeEvent, tableId);
+ schemaManager.applyEvolvedSchemaChange(changeEvent);
finishedSchemaChanges.add(changeEvent);
- } catch (SchemaEvolveException e) {
+ } catch (Throwable t) {
LOG.error(
"Failed to apply schema change {} to table {}.
Caused by: {}",
changeEvent,
tableId,
- e);
- failedSchemaChanges.add(Tuple2.of(changeEvent, e));
+ t);
+ if (!shouldIgnoreException(t)) {
+ schemaChangeException = t;
+ break;
+ } else {
+ LOG.warn(
+ "Failed to apply event {}, but keeps running
in tolerant mode. Caused by: {}",
+ changeEvent,
+ t);
+ }
}
}
}
+
PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
startNextSchemaChangeRequest();
@@ -254,6 +263,10 @@ public class SchemaRegistryRequestHandler implements
Closeable {
() -> applySchemaChange(tableId,
waitFlushSuccess.derivedSchemaChangeEvents));
Thread.sleep(1000);
+ if (schemaChangeException != null) {
+ throw new RuntimeException("Failed to apply schema change.",
schemaChangeException);
+ }
+
if (isSchemaChangeApplying) {
waitFlushSuccess
.getResponseFuture()
@@ -261,12 +274,7 @@ public class SchemaRegistryRequestHandler implements
Closeable {
} else {
waitFlushSuccess
.getResponseFuture()
- .complete(
- wrap(
- new ReleaseUpstreamResponse(
- finishedSchemaChanges,
- failedSchemaChanges,
- ignoredSchemaChanges)));
+ .complete(wrap(new
ReleaseUpstreamResponse(finishedSchemaChanges)));
}
}
}
@@ -305,16 +313,15 @@ public class SchemaRegistryRequestHandler implements
Closeable {
}
public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
+ if (schemaChangeException != null) {
+ throw new RuntimeException("Failed to apply schema change.",
schemaChangeException);
+ }
+
if (isSchemaChangeApplying) {
return CompletableFuture.supplyAsync(() -> wrap(new
SchemaChangeProcessingResponse()));
} else {
return CompletableFuture.supplyAsync(
- () ->
- wrap(
- new ReleaseUpstreamResponse(
- finishedSchemaChanges,
- failedSchemaChanges,
- ignoredSchemaChanges)));
+ () -> wrap(new
ReleaseUpstreamResponse(finishedSchemaChanges)));
}
}
@@ -437,6 +444,15 @@ public class SchemaRegistryRequestHandler implements
Closeable {
}
}
+ private boolean shouldIgnoreException(Throwable throwable) {
+
+ // In IGNORE mode, will never try to apply schema change events
+ // In EVOLVE and and LENIENT mode, such failure will not be tolerated
+ // In EXCEPTION mode, an exception will be thrown once captured
+ return (throwable instanceof UnsupportedSchemaChangeEventException)
+ && (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE);
+ }
+
private static class PendingSchemaChange {
private final SchemaChangeRequest changeRequest;
private List<SchemaChangeEvent> derivedSchemaChangeEvents;
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java
deleted file mode 100644
index f6798af51..000000000
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- * The request from {@link SchemaOperator} to {@link SchemaRegistry} to
request apply evolved schema
- * changes.
- */
-public class ApplyEvolvedSchemaChangeRequest implements CoordinationRequest {
-
- private static final long serialVersionUID = 1L;
-
- /** The sender of the request. */
- private final TableId tableId;
- /** The schema changes. */
- private final List<SchemaChangeEvent> schemaChangeEvent;
-
- public ApplyEvolvedSchemaChangeRequest(
- TableId tableId, List<SchemaChangeEvent> schemaChangeEvent) {
- this.tableId = tableId;
- this.schemaChangeEvent = schemaChangeEvent;
- }
-
- public TableId getTableId() {
- return tableId;
- }
-
- public List<SchemaChangeEvent> getSchemaChangeEvent() {
- return schemaChangeEvent;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof ApplyEvolvedSchemaChangeRequest)) {
- return false;
- }
- ApplyEvolvedSchemaChangeRequest that =
(ApplyEvolvedSchemaChangeRequest) o;
- return Objects.equals(tableId, that.tableId)
- && Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(tableId, schemaChangeEvent);
- }
-}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java
deleted file mode 100644
index 787adfc5e..000000000
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
-
-/**
- * The response from {@link SchemaRegistry} to {@link SchemaOperator} to
request apply original
- * schema changes, the evolved schema changes come from original schema
changes with different
- * schema evolution strategy.
- */
-public class ApplyEvolvedSchemaChangeResponse implements CoordinationResponse {
-
- private static final long serialVersionUID = 1L;
-}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java
deleted file mode 100644
index d4c5d7fee..000000000
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
-
-import java.util.Objects;
-
-/**
- * The request from {@link SchemaOperator} to {@link SchemaRegistry} to
request apply original
- * schema changes.
- */
-public class ApplyOriginalSchemaChangeRequest implements CoordinationRequest {
-
- private static final long serialVersionUID = 1L;
-
- /** The sender of the request. */
- private final TableId tableId;
- /** The schema changes. */
- private final SchemaChangeEvent schemaChangeEvent;
-
- public ApplyOriginalSchemaChangeRequest(TableId tableId, SchemaChangeEvent
schemaChangeEvent) {
- this.tableId = tableId;
- this.schemaChangeEvent = schemaChangeEvent;
- }
-
- public TableId getTableId() {
- return tableId;
- }
-
- public SchemaChangeEvent getSchemaChangeEvent() {
- return schemaChangeEvent;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof ApplyOriginalSchemaChangeRequest)) {
- return false;
- }
- ApplyOriginalSchemaChangeRequest that =
(ApplyOriginalSchemaChangeRequest) o;
- return Objects.equals(tableId, that.tableId)
- && Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(tableId, schemaChangeEvent);
- }
-}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java
deleted file mode 100644
index 0a92e9656..000000000
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
-
-/**
- * The response from {@link SchemaRegistry} to {@link SchemaOperator} to
request apply original
- * schema changes.
- */
-public class ApplyOriginalSchemaChangeResponse implements CoordinationResponse
{
-
- private static final long serialVersionUID = 1L;
-}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
index f577f1120..bea880962 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
@@ -17,7 +17,6 @@
package org.apache.flink.cdc.runtime.operators.schema.event;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
@@ -25,7 +24,6 @@ import
org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/**
* The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry}
to {@link
@@ -41,50 +39,19 @@ public class ReleaseUpstreamResponse implements
CoordinationResponse {
*/
private final List<SchemaChangeEvent> finishedSchemaChangeEvents;
- private final List<Tuple2<SchemaChangeEvent, Throwable>>
failedSchemaChangeEvents;
-
- private final List<SchemaChangeEvent> ignoredSchemaChangeEvents;
-
- public ReleaseUpstreamResponse(
- List<SchemaChangeEvent> finishedSchemaChangeEvents,
- List<Tuple2<SchemaChangeEvent, Throwable>>
failedSchemaChangeEvents,
- List<SchemaChangeEvent> ignoredSchemaChangeEvents) {
+ public ReleaseUpstreamResponse(List<SchemaChangeEvent>
finishedSchemaChangeEvents) {
this.finishedSchemaChangeEvents = finishedSchemaChangeEvents;
- this.failedSchemaChangeEvents = failedSchemaChangeEvents;
- this.ignoredSchemaChangeEvents = ignoredSchemaChangeEvents;
}
public List<SchemaChangeEvent> getFinishedSchemaChangeEvents() {
return finishedSchemaChangeEvents;
}
- public List<Tuple2<SchemaChangeEvent, Throwable>>
getFailedSchemaChangeEvents() {
- return failedSchemaChangeEvents;
- }
-
- public List<SchemaChangeEvent> getIgnoredSchemaChangeEvents() {
- return ignoredSchemaChangeEvents;
- }
-
- public String getPrintableFailedSchemaChangeEvents() {
- return failedSchemaChangeEvents.stream()
- .map(e -> "Failed to apply " + e.f0 + ". Caused by: " + e.f1)
- .collect(Collectors.joining("\n"));
- }
-
- public boolean hasException() {
- return !failedSchemaChangeEvents.isEmpty();
- }
-
@Override
public String toString() {
return "ReleaseUpstreamResponse{"
+ "finishedSchemaChangeEvents="
+ finishedSchemaChangeEvents
- + ", failedSchemaChangeEvents="
- + failedSchemaChangeEvents
- + ", ignoredSchemaChangeEvents="
- + ignoredSchemaChangeEvents
+ '}';
}
@@ -97,14 +64,11 @@ public class ReleaseUpstreamResponse implements
CoordinationResponse {
return false;
}
ReleaseUpstreamResponse that = (ReleaseUpstreamResponse) object;
- return Objects.equals(finishedSchemaChangeEvents,
that.finishedSchemaChangeEvents)
- && Objects.equals(failedSchemaChangeEvents,
that.failedSchemaChangeEvents)
- && Objects.equals(ignoredSchemaChangeEvents,
that.ignoredSchemaChangeEvents);
+ return Objects.equals(finishedSchemaChangeEvents,
that.finishedSchemaChangeEvents);
}
@Override
public int hashCode() {
- return Objects.hash(
- finishedSchemaChangeEvents, failedSchemaChangeEvents,
ignoredSchemaChangeEvents);
+ return Objects.hash(finishedSchemaChangeEvents);
}
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
index 325aee7a7..51acbd536 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
@@ -1040,6 +1040,8 @@ public class SchemaEvolveTest {
Column.physicalColumn(
"height", DOUBLE,
"Height data")))));
Assertions.assertThatThrownBy(() -> processEvent(schemaOperator,
addColumnEvents))
+ .cause()
+ .cause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to apply schema change");
harness.close();
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index b5262d2d5..354304599 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -25,8 +25,6 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Schema;
import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest;
import
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
@@ -56,7 +54,6 @@ import org.apache.flink.util.SerializedValue;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.Set;
@@ -166,14 +163,10 @@ public class EventOperatorTestHarness<OP extends
AbstractStreamOperator<E>, E ex
}
public void registerTableSchema(TableId tableId, Schema schema) {
- schemaRegistry.handleCoordinationRequest(
- new ApplyOriginalSchemaChangeRequest(
- tableId, new CreateTableEvent(tableId, schema)));
+ schemaRegistry.handleApplyOriginalSchemaChangeEvent(new
CreateTableEvent(tableId, schema));
schemaRegistry.handleCoordinationRequest(
new SchemaChangeRequest(tableId, new CreateTableEvent(tableId,
schema)));
- schemaRegistry.handleCoordinationRequest(
- new ApplyEvolvedSchemaChangeRequest(
- tableId, Collections.singletonList(new
CreateTableEvent(tableId, schema))));
+ schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new
CreateTableEvent(tableId, schema));
}
public Schema getLatestOriginalSchema(TableId tableId) throws Exception {
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
index 9398e1f1e..9384fa757 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
@@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import java.time.Duration;
@@ -83,8 +84,7 @@ public class CollectingMetadataApplier implements
MetadataApplier {
try {
Thread.sleep(duration.toMillis());
if (errorsOnEventTypes.contains(schemaChangeEvent.getType())) {
- throw new SchemaEvolveException(
- schemaChangeEvent, "Dummy metadata apply exception
for test.", null);
+ throw new
UnsupportedSchemaChangeEventException(schemaChangeEvent);
}
} catch (InterruptedException ignore) {
// Ignores sleep interruption