leonardBang commented on code in PR #3339: URL: https://github.com/apache/flink-cdc/pull/3339#discussion_r1707234423
########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.exceptions; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +/** An exception occurred during schema evolution. */ +public class SchemaEvolveException extends FlinkRuntimeException { + private final SchemaChangeEvent applyingEvent; + private final String problem; Review Comment: exceptionMessage ? ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyUpstreamSchemaChangeRequest.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.event; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; +import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; + +import java.util.Objects; + +/** + * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request apply evolved schema + * changes. + */ +public class ApplyUpstreamSchemaChangeRequest implements CoordinationRequest { Review Comment: ```suggestion /** * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request apply original schema * changes. */ public class ApplyOriginalSchemaChangeRequest implements CoordinationRequest { ``` ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event; + +/** An enumeration of schema change event types for {@link SchemaChangeEvent}. */ +public enum SchemaChangeEventType { + ADD_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE, + DROP_COLUMN, + RENAME_COLUMN; + + public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) { + if (event instanceof AddColumnEvent) { + return ADD_COLUMN; + } else if (event instanceof AlterColumnTypeEvent) { + return ALTER_COLUMN_TYPE; + } else if (event instanceof CreateTableEvent) { + return CREATE_TABLE; + } else if (event instanceof DropColumnEvent) { + return DROP_COLUMN; + } else if (event instanceof RenameColumnEvent) { + return RENAME_COLUMN; + } else { + throw new RuntimeException("Unknown schema change event type: " + event.getClass()); Review Comment: We can simply throw a RuntimeException for invalid String tag, a null SchemaChangeEventType is dangerous for caller. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java: ########## @@ -315,9 +422,62 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh tableId, getRuntimeContext().getIndexOfThisSubtask()); output.collect(new StreamRecord<>(new FlushEvent(tableId))); - response.getSchemaChangeEvents().forEach(e -> output.collect(new StreamRecord<>(e))); + List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents(); + schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size()); + // The request will block until flushing finished in each sink writer - requestReleaseUpstream(); + ReleaseUpstreamResponse schemaEvolveResponse = requestReleaseUpstream(); + List<SchemaChangeEvent> finishedSchemaChangeEvents = + schemaEvolveResponse.getFinishedSchemaChangeEvents(); + List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChangeEvents = + schemaEvolveResponse.getFailedSchemaChangeEvents(); + List<SchemaChangeEvent> ignoredSchemaChangeEvents = + schemaEvolveResponse.getIgnoredSchemaChangeEvents(); + + if (schemaChangeBehavior == SchemaChangeBehavior.EVOLVE + || schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) { + if (schemaEvolveResponse.hasException()) { + throw new RuntimeException( + String.format( + "Failed to apply schema change event %s.\nExceptions: %s", + schemaChangeEvent, + schemaEvolveResponse.getPrintableFailedSchemaChangeEvents())); + } + } else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE + || schemaChangeBehavior == SchemaChangeBehavior.LENIENT + || schemaChangeBehavior == SchemaChangeBehavior.IGNORE) { + if (schemaEvolveResponse.hasException()) { + schemaEvolveResponse + .getFailedSchemaChangeEvents() + .forEach( + e -> + LOG.warn( + "Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}", + e.f0, + e.f1)); + } + } else { + throw new IllegalStateException( + "Unexpected schema change behavior: " + schemaChangeBehavior); Review Comment: SchemaEvolveException ? ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event; + +import org.apache.flink.cdc.common.annotation.Public; + +/** An enumeration of schema change event types for {@link SchemaChangeEvent}. */ +@Public Review Comment: Please use `PublicEvolving` for first introducing API ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.exceptions; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +/** An exception occurred during schema evolution. */ +public class SchemaEvolveException extends FlinkRuntimeException { + private final SchemaChangeEvent applyingEvent; + private final String problem; + private final @Nullable Throwable context; Review Comment: the name context is strange name for a Throwable type ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.event; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; +import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; + +import java.util.List; +import java.util.Objects; + +/** + * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request apply evolved schema + * changes. Review Comment: ```suggestion * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request apply evolved schema * changes, the evolved schema changes come from original schema changes with different schema evolution strategy. ``` ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.event; + +import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; +import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; + +/** + * The response from {@link SchemaOperator} to {@link SchemaRegistry} to request apply evolved + * schema changes. Review Comment: the doc looks like incorrect ? ########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java: ########## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.output.ToStringConsumer; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** E2e tests for Schema Evolution cases. */ +public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(SchemaEvolveE2eITCase.class); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; + protected static final long EVENT_WAITING_TIMEOUT = 60000L; + + @ClassRule + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected final UniqueDatabase schemaEvolveDatabase = + new UniqueDatabase(MYSQL, "schema_evolve", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @Before + public void before() throws Exception { + super.before(); + schemaEvolveDatabase.createAndInitialize(); + } + + @After + public void after() { + super.after(); + schemaEvolveDatabase.dropDatabase(); + } + + private void validateSnapshotData(String tableName) throws Exception { + List<String> expected = + Stream.of( + "CreateTableEvent{tableId=%s.%s, schema=columns={`id` INT NOT NULL,`name` VARCHAR(17),`age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.%s, before=[], after=[1008, Alice, 21], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.%s, before=[], after=[1009, Bob, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.%s, before=[], after=[1010, Carol, 19], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.%s, before=[], after=[1011, Derrida, 18], op=INSERT, meta=()}") + .map( + s -> + String.format( + s, + schemaEvolveDatabase.getDatabaseName(), + tableName)) + .collect(Collectors.toList()); + + validateResult(expected, taskManagerConsumer); + } + + private void waitForIncrementalStage(String tableName, Statement stmt) throws Exception { + stmt.execute("INSERT INTO members VALUES (0, '__fence__', 0);"); + + // Ensure we change schema after incremental stage + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.%s, before=[], after=[0, __fence__, 0], op=INSERT, meta=()}", + schemaEvolveDatabase.getDatabaseName(), tableName), + taskManagerConsumer); + } + + @Test + public void testSchemaEvolve() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.members\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: evolve\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + schemaEvolveDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + validateSnapshotData("members"); + + LOG.info("Starting schema evolution"); + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + schemaEvolveDatabase.getDatabaseName()); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + + waitForIncrementalStage("members", stmt); + + // triggers AddColumnEvent + stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;"); + stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);"); + + // triggers AlterColumnTypeEvent and RenameColumnEvent + stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;"); + + // triggers RenameColumnEvent + stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;"); + + // triggers DropColumnEvent + stmt.execute("ALTER TABLE members DROP COLUMN biological_sex"); + stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);"); + } + + List<String> expected = + Stream.of( + "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}", + "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", + "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", + "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0], op=INSERT, meta=()}") + .map(s -> String.format(s, schemaEvolveDatabase.getDatabaseName())) + .collect(Collectors.toList()); + + validateResult(expected, taskManagerConsumer); + } + + @Test + public void testSchemaEvolveWithIncompatibleChanges() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.(members|new_members)\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "route:\n" + + " - source-table: %s.(members|new_members)\n" + + " sink-table: %s.merged\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: evolve\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + schemaEvolveDatabase.getDatabaseName(), + schemaEvolveDatabase.getDatabaseName(), + schemaEvolveDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + validateSnapshotData("merged"); + + LOG.info("Starting schema evolution"); + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + schemaEvolveDatabase.getDatabaseName()); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + waitForIncrementalStage("merged", stmt); + + // incompatible type INT and VARCHAR cannot be merged + stmt.execute("ALTER TABLE members CHANGE COLUMN age age VARCHAR(17);"); + } + + waitUntilSpecificEvent( + "java.lang.IllegalStateException: Incompatible types: \"INT\" and \"VARCHAR(17)\"", + taskManagerConsumer); + + // Ensure that job was terminated + waitUntilSpecificEvent( + "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy", + jobManagerConsumer); + } + + @Test + public void testSchemaEvolveWithException() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.members\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + " error.on.schema.change: true\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: evolve\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + schemaEvolveDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + validateSnapshotData("members"); + + LOG.info("Starting schema evolution"); + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + schemaEvolveDatabase.getDatabaseName()); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + waitForIncrementalStage("members", stmt); + stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;"); + } + + waitUntilSpecificEvent( + String.format( + "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + schemaEvolveDatabase.getDatabaseName()), + taskManagerConsumer); + + validateResult( + Arrays.asList( + String.format( + "Failed to apply schema change event AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}.", + schemaEvolveDatabase.getDatabaseName()), + String.format( + "SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, problem='Rejected schema change event since error.on.schema.change is enabled.', context='null'}", + schemaEvolveDatabase.getDatabaseName()), + "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"), + jobManagerConsumer); + } + + @Test + public void testSchemaTryEvolveWithException() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.members\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + " error.on.schema.change: true\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: try_evolve\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + schemaEvolveDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + validateSnapshotData("members"); + LOG.info("Starting schema evolution"); + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + schemaEvolveDatabase.getDatabaseName()); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + + waitForIncrementalStage("members", stmt); + + stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;"); + stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);"); + stmt.execute("UPDATE members SET name = 'Eva' WHERE id = 1012;"); + stmt.execute("DELETE FROM members WHERE id = 1012;"); + } + + List<String> expected = + Stream.of( + // Add column never succeeded, so age column will not appear. + "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.members, before=[1012, Eve, 17], after=[1012, Eva, 17], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.members, before=[1012, Eva, 17], after=[], op=DELETE, meta=()}") + .map(s -> String.format(s, schemaEvolveDatabase.getDatabaseName())) + .collect(Collectors.toList()); + + validateResult(expected, taskManagerConsumer); + + waitUntilSpecificEvent( + String.format( + "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members.", + schemaEvolveDatabase.getDatabaseName(), + schemaEvolveDatabase.getDatabaseName()), + jobManagerConsumer); + + waitUntilSpecificEvent( + String.format( + "SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, problem='Rejected schema change event since error.on.schema.change is enabled.', context='null'}", + schemaEvolveDatabase.getDatabaseName()), + jobManagerConsumer); + } + + @Test + public void testSchemaIgnore() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.members\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: ignore\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + schemaEvolveDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + validateSnapshotData("members"); + + LOG.info("Starting schema evolution"); + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + schemaEvolveDatabase.getDatabaseName()); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + + waitForIncrementalStage("members", stmt); + + // triggers AddColumnEvent + stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;"); + stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);"); + + // triggers AlterColumnTypeEvent and RenameColumnEvent + stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;"); + + // triggers RenameColumnEvent + stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;"); + + // triggers DropColumnEvent + stmt.execute("ALTER TABLE members DROP COLUMN biological_sex"); + stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);"); + } + + List<String> expected = + Stream.of( + "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null], op=INSERT, meta=()}") + .map(s -> String.format(s, schemaEvolveDatabase.getDatabaseName())) + .collect(Collectors.toList()); + + validateResult(expected, taskManagerConsumer); + } + + @Test + public void testSchemaException() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.members\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: exception\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + schemaEvolveDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + + waitUntilJobRunning(Duration.ofSeconds(30)); + + LOG.info("Pipeline job is running"); + validateSnapshotData("members"); + + LOG.info("Starting schema evolution"); + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + schemaEvolveDatabase.getDatabaseName()); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + waitForIncrementalStage("members", stmt); + + // triggers AddColumnEvent + stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;"); + } + + waitUntilSpecificEvent( + String.format( + "java.lang.RuntimeException: Refused to apply schema change event AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} in EXCEPTION mode.", + schemaEvolveDatabase.getDatabaseName()), + taskManagerConsumer); + } + + @Test + public void testUnexpectedBehavior() { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.members\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: unexpected\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + schemaEvolveDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + + // Submitting job should fail given an unknown schema change behavior configuration + Assert.assertThrows( + AssertionError.class, + () -> submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar)); + } + + @Test + public void testFineGrainedSchemaEvolution() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.members\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + " exclude.schema.changes:\n" + + " - drop\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: evolve\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + schemaEvolveDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + validateSnapshotData("members"); + + LOG.info("Starting schema evolution"); + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + schemaEvolveDatabase.getDatabaseName()); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + waitForIncrementalStage("members", stmt); + + // triggers AddColumnEvent + stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;"); + stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);"); + + // triggers AlterColumnTypeEvent and RenameColumnEvent + stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;"); + + // triggers RenameColumnEvent + stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;"); + + // triggers DropColumnEvent + stmt.execute("ALTER TABLE members DROP COLUMN biological_sex;"); + stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);"); + } + + List<String> expected = + Stream.of( + "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}", + "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", + "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, null], op=INSERT, meta=()}") + .map(s -> String.format(s, schemaEvolveDatabase.getDatabaseName())) + .collect(Collectors.toList()); + + validateResult(expected, taskManagerConsumer); + + waitUntilSpecificEvent( + String.format( + "Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members.", + schemaEvolveDatabase.getDatabaseName(), + schemaEvolveDatabase.getDatabaseName()), + jobManagerConsumer); + } + + @Test + public void testLenientSchemaEvolution() throws Exception { Review Comment: minor: I like the these tests, but parameterized tests will make the test code shorter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
