yuxiqian commented on code in PR #4015:
URL: https://github.com/apache/flink-cdc/pull/4015#discussion_r2113803393
##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java:
##########
@@ -2561,6 +2561,66 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short)
11)),
}
}
+ /** Tests lenient schema change behavior exclude create.table event. */
+ @Test
+ void testLenientSchemaEvolvesExcludeCreate() throws Exception {
+ TableId tableId = CUSTOMERS_TABLE_ID;
+ Schema schemaV1 =
+ Schema.newBuilder()
+ .physicalColumn("id", INT)
+ .physicalColumn("name", STRING.notNull())
+ .physicalColumn("age", SMALLINT)
+ .primaryKey("id")
+ .build();
+
+ SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT;
+
+ SchemaOperator schemaOperator =
+ new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30),
behavior);
+ RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+
RegularEventOperatorTestHarness.withDurationAndExcludeCreateTableBehavior(
+ schemaOperator, 5, Duration.ofSeconds(3), behavior);
Review Comment:
nit: can we reuse
`RegularEventOperatorTestHarness#withDurationAndFineGrainedBehaviorWithError`?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java:
##########
@@ -434,7 +434,12 @@ private void applySchemaChange(int sourceSubTaskId) {
private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent
schemaChangeEvent) {
try {
- metadataApplier.applySchemaChange(schemaChangeEvent);
+ // filter create.table schema change event
+ if
(metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())) {
+ metadataApplier.applySchemaChange(schemaChangeEvent);
+ } else {
+ LOG.info("Skip apply schema change {}.", schemaChangeEvent);
+ }
schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
LOG.info(
"Successfully applied schema change event {} to external
system.",
Review Comment:
Move this log in the `true` branch since no changes will be applied if
skipped
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]