This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new ee843e2f2 [FLINK-36114][cdc-runtime] Make SchemaRegistryRequestHandler
thread safe by blocking subsequent schemaChangeEvent
ee843e2f2 is described below
commit ee843e2f246c300ecfeaf5cfd529693fadf2625f
Author: yuxiqian <[email protected]>
AuthorDate: Thu Aug 22 19:19:21 2024 +0800
[FLINK-36114][cdc-runtime] Make SchemaRegistryRequestHandler thread safe by
blocking subsequent schemaChangeEvent
This closes #3563.
Co-authored-by: Hongshun Wang <[email protected]>
---
.../source/reader/MySqlPipelineRecordEmitter.java | 62 ++--
.../mysql/source/MySqlFullTypesITCase.java | 26 +-
.../mysql/source/MySqlPipelineITCase.java | 30 +-
.../mysql/testutils/MySqSourceTestUtils.java | 19 ++
.../cdc/pipeline/tests/MySqlToDorisE2eITCase.java | 10 +-
.../flink/cdc/pipeline/tests/MysqlE2eITCase.java | 10 +-
.../flink/cdc/pipeline/tests/RouteE2eITCase.java | 35 ++-
.../cdc/pipeline/tests/SchemaEvolveE2eITCase.java | 10 +-
.../cdc/pipeline/tests/TransformE2eITCase.java | 44 ++-
.../flink/cdc/pipeline/tests/UdfE2eITCase.java | 6 +-
.../tests/utils/PipelineTestEnvironment.java | 2 +
.../runtime/operators/schema/SchemaOperator.java | 74 +++--
.../schema/coordinator/SchemaManager.java | 96 ++++++
.../schema/coordinator/SchemaRegistry.java | 10 +-
.../coordinator/SchemaRegistryRequestHandler.java | 349 +++++++++------------
.../schema/event/RefreshPendingListsRequest.java | 27 --
.../schema/event/RefreshPendingListsResponse.java | 26 --
.../schema/event/ReleaseUpstreamRequest.java | 32 --
.../event/SchemaChangeProcessingResponse.java | 4 +-
.../schema/event/SchemaChangeResponse.java | 75 ++++-
...sponse.java => SchemaChangeResultResponse.java} | 8 +-
.../operators/EventOperatorTestHarness.java | 1 -
22 files changed, 540 insertions(+), 416 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index 3f7581347..909ed6c5b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -28,6 +28,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import
org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
+import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
@@ -68,7 +69,7 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
// Used when startup mode is not initial
private boolean alreadySendCreateTableForBinlogSplit = false;
- private final List<CreateTableEvent> createTableEventCache;
+ private List<CreateTableEvent> createTableEventCache;
public MySqlPipelineRecordEmitter(
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
@@ -80,23 +81,7 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
sourceConfig.isIncludeSchemaChanges());
this.sourceConfig = sourceConfig;
this.alreadySendCreateTableTables = new HashSet<>();
- this.createTableEventCache = new ArrayList<>();
-
- if
(!sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
- try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
- List<TableId> capturedTableIds = listTables(jdbc,
sourceConfig.getTableFilters());
- for (TableId tableId : capturedTableIds) {
- Schema schema = getSchema(jdbc, tableId);
- createTableEventCache.add(
- new CreateTableEvent(
-
org.apache.flink.cdc.common.event.TableId.tableId(
- tableId.catalog(),
tableId.table()),
- schema));
- }
- } catch (SQLException e) {
- throw new RuntimeException("Cannot start emitter to fetch
table schema.", e);
- }
- }
+ this.createTableEventCache = generateCreateTableEvent(sourceConfig);
}
@Override
@@ -104,6 +89,8 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
SourceRecord element, SourceOutput<Event> output, MySqlSplitState
splitState)
throws Exception {
if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState())
{
+ // In Snapshot phase of INITIAL startup mode, we lazily send
CreateTableEvent to
+ // downstream to avoid checkpoint timeout.
TableId tableId =
splitState.asSnapshotSplitState().toMySqlSplit().getTableId();
if (!alreadySendCreateTableTables.contains(tableId)) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
@@ -111,11 +98,24 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
alreadySendCreateTableTables.add(tableId);
}
}
- } else if (splitState.isBinlogSplitState()
- && !alreadySendCreateTableForBinlogSplit
- &&
!sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
- createTableEventCache.forEach(output::collect);
+ } else if (splitState.isBinlogSplitState() &&
!alreadySendCreateTableForBinlogSplit) {
alreadySendCreateTableForBinlogSplit = true;
+ if
(sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
+ // In Snapshot -> Binlog transition of INITIAL startup mode,
ensure all table
+ // schemas have been sent to downstream. We use previously
cached schema instead of
+ // re-request latest schema because there might be some
pending schema change events
+ // in the queue, and that may accidentally emit evolved schema
before corresponding
+ // schema change events.
+ createTableEventCache.stream()
+ .filter(
+ event ->
+ !alreadySendCreateTableTables.contains(
+
MySqlSchemaUtils.toDbzTableId(event.tableId())))
+ .forEach(output::collect);
+ } else {
+ // In Binlog only mode, we simply emit all schemas at once.
+ createTableEventCache.forEach(output::collect);
+ }
}
super.processElement(element, output, splitState);
}
@@ -233,4 +233,22 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
}
return mySqlAntlrDdlParser;
}
+
+ private List<CreateTableEvent> generateCreateTableEvent(MySqlSourceConfig
sourceConfig) {
+ try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
+ List<CreateTableEvent> createTableEventCache = new ArrayList<>();
+ List<TableId> capturedTableIds = listTables(jdbc,
sourceConfig.getTableFilters());
+ for (TableId tableId : capturedTableIds) {
+ Schema schema = getSchema(jdbc, tableId);
+ createTableEventCache.add(
+ new CreateTableEvent(
+
org.apache.flink.cdc.common.event.TableId.tableId(
+ tableId.catalog(), tableId.table()),
+ schema));
+ }
+ return createTableEventCache;
+ } catch (SQLException e) {
+ throw new RuntimeException("Cannot start emitter to fetch table
schema.", e);
+ }
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
index 91351dabf..2bd1ae689 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
@@ -295,8 +295,9 @@ public class MySqlFullTypesITCase extends
MySqlSourceTestBase {
.executeAndCollect();
// skip CreateTableEvent
- List<Event> snapshotResults =
MySqSourceTestUtils.fetchResults(iterator, 2);
- RecordData snapshotRecord = ((DataChangeEvent)
snapshotResults.get(1)).after();
+ List<Event> snapshotResults =
+ MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator,
1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent)
snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord,
recordType))
.isEqualTo(expectedSnapshot);
@@ -306,7 +307,8 @@ public class MySqlFullTypesITCase extends
MySqlSourceTestBase {
statement.execute("UPDATE precision_types SET time_6_c = null
WHERE id = 1;");
}
- List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator,
1);
+ List<Event> streamResults =
+ MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator,
1).f0;
RecordData streamRecord = ((DataChangeEvent)
streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord,
recordType))
.isEqualTo(expectedStreamRecord);
@@ -397,8 +399,9 @@ public class MySqlFullTypesITCase extends
MySqlSourceTestBase {
};
// skip CreateTableEvent
- List<Event> snapshotResults =
MySqSourceTestUtils.fetchResults(iterator, 2);
- RecordData snapshotRecord = ((DataChangeEvent)
snapshotResults.get(1)).after();
+ List<Event> snapshotResults =
+ MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator,
1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent)
snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord,
COMMON_TYPES))
.isEqualTo(expectedSnapshot);
@@ -412,7 +415,8 @@ public class MySqlFullTypesITCase extends
MySqlSourceTestBase {
expectedSnapshot[44] =
BinaryStringData.fromString("{\"key1\":\"value1\"}");
Object[] expectedStreamRecord = expectedSnapshot;
- List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator,
1);
+ List<Event> streamResults =
+ MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator,
1).f0;
RecordData streamRecord = ((DataChangeEvent)
streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord,
COMMON_TYPES))
.isEqualTo(expectedStreamRecord);
@@ -437,9 +441,10 @@ public class MySqlFullTypesITCase extends
MySqlSourceTestBase {
"Event-Source")
.executeAndCollect();
- // skip CreateTableEvent
- List<Event> snapshotResults =
MySqSourceTestUtils.fetchResults(iterator, 2);
- RecordData snapshotRecord = ((DataChangeEvent)
snapshotResults.get(1)).after();
+ // skip CreateTableEvents
+ List<Event> snapshotResults =
+ MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator,
1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent)
snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord,
recordType))
.isEqualTo(expectedSnapshot);
@@ -450,7 +455,8 @@ public class MySqlFullTypesITCase extends
MySqlSourceTestBase {
"UPDATE time_types SET time_6_c = null, timestamp_def_c =
default WHERE id = 1;");
}
- List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator,
1);
+ List<Event> streamResults =
+ MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator,
1).f0;
RecordData streamRecord = ((DataChangeEvent)
streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord,
recordType))
.isEqualTo(expectedStreamRecord);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 9aadcbef2..108a3d36d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -62,6 +62,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -226,12 +227,33 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
BinaryStringData.fromString("c-21")
})));
}
+ // In this configuration, several subtasks might emit their
corresponding CreateTableEvent
+ // to downstream. Since it is not possible to predict how many
CreateTableEvents should we
+ // expect, we simply filter them out from expected sets, and assert
there's at least one.
List<Event> actual =
- fetchResults(events, 1 + expectedSnapshot.size() +
expectedBinlog.size());
- assertThat(actual.get(0)).isEqualTo(createTableEvent);
- assertThat(actual.subList(1, 10))
+ fetchResultsExcept(
+ events, expectedSnapshot.size() +
expectedBinlog.size(), createTableEvent);
+ assertThat(actual.subList(0, expectedSnapshot.size()))
.containsExactlyInAnyOrder(expectedSnapshot.toArray(new
Event[0]));
- assertThat(actual.subList(10,
actual.size())).isEqualTo(expectedBinlog);
+ assertThat(actual.subList(expectedSnapshot.size(), actual.size()))
+ .isEqualTo(expectedBinlog);
+ }
+
+ private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size,
T sideEvent) {
+ List<T> result = new ArrayList<>(size);
+ List<T> sideResults = new ArrayList<>();
+ while (size > 0 && iter.hasNext()) {
+ T event = iter.next();
+ if (!event.equals(sideEvent)) {
+ result.add(event);
+ size--;
+ } else {
+ sideResults.add(sideEvent);
+ }
+ }
+ // Also ensure we've received at least one or many side events.
+ assertThat(sideResults).isNotEmpty();
+ return result;
}
@Test
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
index 0708cd8cd..a76838d66 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
@@ -17,6 +17,9 @@
package org.apache.flink.cdc.connectors.mysql.testutils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -38,6 +41,22 @@ public class MySqSourceTestUtils {
return result;
}
+ public static <T> Tuple2<List<T>, List<CreateTableEvent>>
fetchResultsAndCreateTableEvent(
+ Iterator<T> iter, int size) {
+ List<T> result = new ArrayList<>(size);
+ List<CreateTableEvent> createTableEvents = new ArrayList<>();
+ while (size > 0 && iter.hasNext()) {
+ T event = iter.next();
+ if (event instanceof CreateTableEvent) {
+ createTableEvents.add((CreateTableEvent) event);
+ } else {
+ result.add(event);
+ size--;
+ }
+ }
+ return Tuple2.of(result, createTableEvents);
+ }
+
public static String getServerId(int parallelism) {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
index 0e58c5ed2..508eebdb7 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
@@ -194,12 +194,13 @@ public class MySqlToDorisE2eITCase extends
PipelineTestEnvironment {
+ " table.create.properties.replication_num:
1\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName(),
DORIS.getUsername(),
- DORIS.getPassword());
+ DORIS.getPassword(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path dorisCdcConnector =
TestUtils.getResource("doris-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -324,13 +325,14 @@ public class MySqlToDorisE2eITCase extends
PipelineTestEnvironment {
+ " projection: \\*, 'fine' AS FINE\n"
+ " filter: id <> 3 AND id <> 4\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
complexDataTypesDatabase.getDatabaseName(),
DORIS.getUsername(),
DORIS.getPassword(),
- complexDataTypesDatabase.getDatabaseName());
+ complexDataTypesDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path dorisCdcConnector =
TestUtils.getResource("doris-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index e340468f3..1f730be62 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -102,11 +102,12 @@ public class MysqlE2eITCase extends
PipelineTestEnvironment {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
- mysqlInventoryDatabase.getDatabaseName());
+ mysqlInventoryDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -209,11 +210,12 @@ public class MysqlE2eITCase extends
PipelineTestEnvironment {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
- mysqlInventoryDatabase.getDatabaseName());
+ mysqlInventoryDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
index 0834ce6e7..d1af8560c 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
@@ -154,11 +154,12 @@ public class RouteE2eITCase extends
PipelineTestEnvironment {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
- routeTestDatabase.getDatabaseName());
+ routeTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -248,13 +249,14 @@ public class RouteE2eITCase extends
PipelineTestEnvironment {
+ " sink-table: %s.ALL\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
- routeTestDatabase.getDatabaseName());
+ routeTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -331,13 +333,14 @@ public class RouteE2eITCase extends
PipelineTestEnvironment {
+ " sink-table: NEW_%s.ALPHABET\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
- routeTestDatabase.getDatabaseName());
+ routeTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -428,7 +431,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment
{
+ " sink-table: NEW_%s.BETAGAMM\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@@ -436,7 +439,8 @@ public class RouteE2eITCase extends PipelineTestEnvironment
{
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
- routeTestDatabase.getDatabaseName());
+ routeTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -535,7 +539,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment
{
+ " sink-table: NEW_%s.TABLEC\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@@ -545,7 +549,8 @@ public class RouteE2eITCase extends PipelineTestEnvironment
{
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
- routeTestDatabase.getDatabaseName());
+ routeTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -647,14 +652,15 @@ public class RouteE2eITCase extends
PipelineTestEnvironment {
+ " sink-table: %s.ALL\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
- routeTestDatabase.getDatabaseName());
+ routeTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -733,13 +739,14 @@ public class RouteE2eITCase extends
PipelineTestEnvironment {
+ " replace-symbol: <>\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
- routeTestDatabase.getDatabaseName());
+ routeTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
index 14c6a5912..92c2622c9 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
@@ -240,11 +240,12 @@ public class SchemaEvolveE2eITCase extends
PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: unexpected\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
- schemaEvolveDatabase.getDatabaseName());
+ schemaEvolveDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -310,13 +311,14 @@ public class SchemaEvolveE2eITCase extends
PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: %s\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
dbName,
mergeTable ? "(members|new_members)" : "members",
- behavior);
+ behavior,
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 6f612ca95..ae516fc06 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -118,7 +118,7 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " - source-table: %s.TABLEBETA\n"
+ " projection: ID, VERSION\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@@ -126,7 +126,8 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -200,13 +201,14 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " filter: ID <= 1008\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -287,7 +289,7 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " - source-table: %s.TABLEBETA\n"
+ " projection: ID, CONCAT('v', VERSION) AS
VERSION, LOWER(NAMEBETA) AS NAME\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@@ -295,7 +297,8 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -365,13 +368,14 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " - source-table: %s.TABLEBETA\n"
+ " projection: \\*, CONCAT('v', VERSION)
AS VERSION, LOWER(NAMEBETA) AS NAME\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -446,13 +450,14 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " - source-table: %s.TABLEBETA\n"
+ " projection: \\*, __namespace_name__ ||
'.' || __schema_name__ || '.' || __table_name__ AS identifier_name,
__data_event_type__ AS type\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -530,12 +535,13 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " - source-table: %s.TABLE\\.*\n"
+ " projection: \\*, ID + 1000 as UID,
VERSION AS NEWVERSION\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -613,13 +619,14 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " - source-table: %s.TABLEBETA\n"
+ " projection: ID, CAST(VERSION AS DOUBLE)
+ 100 AS VERSION, CAST(AGEBETA AS VARCHAR) || ' - ' || NAMEBETA AS IDENTIFIER\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -708,13 +715,14 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " projection: ID, LOCALTIME as lcl_t,
CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts,
CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as
cur_dt\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1\n"
+ + " parallelism: %d\n"
+ " local-time-zone: America/Los_Angeles",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -781,7 +789,9 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
}
boolean extractDataLines(String line) {
- if (!line.startsWith("DataChangeEvent{")) {
+ // In multiple parallelism mode, a prefix with subTaskId (like '1> ')
will be appended.
+ // Should trim it before extracting data fields.
+ if (!line.startsWith("DataChangeEvent{", 3)) {
return false;
}
Stream.of("before", "after")
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
index 938e2d98e..9f5b98298 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
@@ -128,7 +128,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
+ " projection: ID, VERSION, answer() AS
ANS, typeof(ID) AS TYP\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1\n"
+ + " parallelism: %d\n"
+ " user-defined-function:\n"
+ " - name: addone\n"
+ " classpath:
org.apache.flink.cdc.udf.examples.%s.AddOneFunctionClass\n"
@@ -146,6 +146,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
+ parallelism,
language,
language,
language,
@@ -267,7 +268,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
+ " projection: ID, VERSION, typeof(ID) AS
TYP\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: 1\n"
+ + " parallelism: %d\n"
+ " user-defined-function:\n"
+ " - name: addone\n"
+ " classpath:
org.apache.flink.udf.examples.%s.AddOneFunctionClass\n"
@@ -281,6 +282,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
+ parallelism,
language,
language,
language);
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index 72576bf17..048e23bd7 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -67,6 +67,8 @@ public abstract class PipelineTestEnvironment extends
TestLogger {
@Parameterized.Parameter public String flinkVersion;
+ public Integer parallelism = 4;
+
//
------------------------------------------------------------------------------------------
// Flink Variables
//
------------------------------------------------------------------------------------------
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index a700fd39c..1b4f50e89 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -39,20 +39,17 @@ import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
-import
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
+import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import
org.apache.flink.cdc.runtime.operators.schema.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
-import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -120,6 +117,7 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
private final SchemaChangeBehavior schemaChangeBehavior;
private transient SchemaOperatorMetrics schemaOperatorMetrics;
+ private transient int subTaskId;
@VisibleForTesting
public SchemaOperator(List<RouteRule> routingRules) {
@@ -153,6 +151,7 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
schemaOperatorMetrics =
new SchemaOperatorMetrics(
getRuntimeContext().getMetricGroup(),
schemaChangeBehavior);
+ subTaskId = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
@@ -218,17 +217,6 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
});
}
- @Override
- public void initializeState(StateInitializationContext context) throws
Exception {
- if (context.isRestored()) {
- // Multiple operators may appear during a restart process,
- // only clear the pendingSchemaChanges when the first operator
starts.
- if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
- sendRequestToCoordinator(new RefreshPendingListsRequest());
- }
- }
- }
-
/**
* This method is guaranteed to not be called concurrently with other
methods of the operator.
*/
@@ -248,7 +236,11 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
private void processSchemaChangeEvents(SchemaChangeEvent event)
throws InterruptedException, TimeoutException, ExecutionException {
TableId tableId = event.tableId();
- LOG.info("Table {} received SchemaChangeEvent and start to be
blocked.", tableId);
+ LOG.info(
+ "{}> Table {} received SchemaChangeEvent {} and start to be
blocked.",
+ subTaskId,
+ tableId,
+ event);
handleSchemaChangeEvent(tableId, event);
// Update caches
originalSchema.put(tableId, getLatestOriginalSchema(tableId));
@@ -412,36 +404,62 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
schemaChangeEvent));
}
- // The request will need to send a FlushEvent or block until flushing
finished
+ // The request will block if another schema change event is being
handled
SchemaChangeResponse response = requestSchemaChange(tableId,
schemaChangeEvent);
- if (!response.getSchemaChangeEvents().isEmpty()) {
- LOG.info(
- "Sending the FlushEvent for table {} in subtask {}.",
- tableId,
- getRuntimeContext().getIndexOfThisSubtask());
+ if (response.isAccepted()) {
+ LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId,
tableId);
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
List<SchemaChangeEvent> expectedSchemaChangeEvents =
response.getSchemaChangeEvents();
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
// The request will block until flushing finished in each sink
writer
- ReleaseUpstreamResponse schemaEvolveResponse =
requestReleaseUpstream();
+ SchemaChangeResultResponse schemaEvolveResponse =
requestSchemaChangeResult();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
// Update evolved schema changes based on apply results
finishedSchemaChangeEvents.forEach(e -> output.collect(new
StreamRecord<>(e)));
+ } else if (response.isDuplicate()) {
+ LOG.info(
+ "{}> Schema change event {} has been handled in another
subTask already.",
+ subTaskId,
+ schemaChangeEvent);
+ } else if (response.isIgnored()) {
+ LOG.info(
+ "{}> Schema change event {} has been ignored. No schema
evolution needed.",
+ subTaskId,
+ schemaChangeEvent);
+ } else {
+ throw new IllegalStateException("Unexpected response status " +
response);
}
}
private SchemaChangeResponse requestSchemaChange(
- TableId tableId, SchemaChangeEvent schemaChangeEvent) {
- return sendRequestToCoordinator(new SchemaChangeRequest(tableId,
schemaChangeEvent));
+ TableId tableId, SchemaChangeEvent schemaChangeEvent)
+ throws InterruptedException, TimeoutException {
+ long schemaEvolveTimeOutMillis = System.currentTimeMillis() +
rpcTimeOutInMillis;
+ while (true) {
+ SchemaChangeResponse response =
+ sendRequestToCoordinator(new SchemaChangeRequest(tableId,
schemaChangeEvent));
+ if (response.isRegistryBusy()) {
+ if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
+ LOG.info(
+ "{}> Schema Registry is busy now, waiting for next
request...",
+ subTaskId);
+ Thread.sleep(1000);
+ } else {
+ throw new TimeoutException("TimeOut when requesting schema
change");
+ }
+ } else {
+ return response;
+ }
+ }
}
- private ReleaseUpstreamResponse requestReleaseUpstream()
+ private SchemaChangeResultResponse requestSchemaChangeResult()
throws InterruptedException, TimeoutException {
CoordinationResponse coordinationResponse =
- sendRequestToCoordinator(new ReleaseUpstreamRequest());
+ sendRequestToCoordinator(new SchemaChangeResultRequest());
long nextRpcTimeOutMillis = System.currentTimeMillis() +
rpcTimeOutInMillis;
while (coordinationResponse instanceof SchemaChangeProcessingResponse)
{
if (System.currentTimeMillis() < nextRpcTimeOutMillis) {
@@ -451,7 +469,7 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
throw new TimeoutException("TimeOut when requesting release
upstream");
}
}
- return ((ReleaseUpstreamResponse) coordinationResponse);
+ return ((SchemaChangeResultResponse) coordinationResponse);
}
private <REQUEST extends CoordinationRequest, RESPONSE extends
CoordinationResponse>
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
index b87ef5b12..60e70b7ed 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
@@ -18,11 +18,15 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.cdc.runtime.serializer.schema.SchemaSerializer;
@@ -39,6 +43,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
@@ -93,6 +98,97 @@ public class SchemaManager {
return behavior;
}
+ /**
+ * This function checks if the given schema change event has been applied
already. If so, it
+ * will be ignored to avoid sending duplicate evolved schema change events
to sink metadata
+ * applier.
+ */
+ public final boolean
isOriginalSchemaChangeEventRedundant(SchemaChangeEvent event) {
+ TableId tableId = event.tableId();
+ Optional<Schema> latestSchema = getLatestOriginalSchema(tableId);
+ return Boolean.TRUE.equals(
+ SchemaChangeEventVisitor.visit(
+ event,
+ addColumnEvent -> {
+ // It has not been applied if schema does not even
exist
+ if (!latestSchema.isPresent()) {
+ return false;
+ }
+ List<Column> existedColumns =
latestSchema.get().getColumns();
+
+ // It has been applied only if all columns are
present in existedColumns
+ for (AddColumnEvent.ColumnWithPosition column :
+ addColumnEvent.getAddedColumns()) {
+ if
(!existedColumns.contains(column.getAddColumn())) {
+ return false;
+ }
+ }
+ return true;
+ },
+ alterColumnTypeEvent -> {
+ // It has not been applied if schema does not even
exist
+ if (!latestSchema.isPresent()) {
+ return false;
+ }
+ Schema schema = latestSchema.get();
+
+ // It has been applied only if all column types
are set as expected
+ for (Map.Entry<String, DataType> entry :
+
alterColumnTypeEvent.getTypeMapping().entrySet()) {
+ if
(!schema.getColumn(entry.getKey()).isPresent()
+ || !schema.getColumn(entry.getKey())
+ .get()
+ .getType()
+ .equals(entry.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ },
+ createTableEvent -> {
+ // It has been applied if such table already exists
+ return latestSchema.isPresent();
+ },
+ dropColumnEvent -> {
+ // It has not been applied if schema does not even
exist
+ if (!latestSchema.isPresent()) {
+ return false;
+ }
+ List<String> existedColumnNames =
latestSchema.get().getColumnNames();
+
+ // It has been applied only if corresponding
column types do not exist
+ return
dropColumnEvent.getDroppedColumnNames().stream()
+ .noneMatch(existedColumnNames::contains);
+ },
+ dropTableEvent -> {
+ // It has been applied if such table does not exist
+ return !latestSchema.isPresent();
+ },
+ renameColumnEvent -> {
+ // It has been applied if such table already exists
+ if (!latestSchema.isPresent()) {
+ return false;
+ }
+ List<String> existedColumnNames =
latestSchema.get().getColumnNames();
+
+ // It has been applied only if all previous names
do not exist, and all
+ // new names already exist
+ for (Map.Entry<String, String> entry :
+
renameColumnEvent.getNameMapping().entrySet()) {
+ if (existedColumnNames.contains(entry.getKey())
+ ||
!existedColumnNames.contains(entry.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ },
+ truncateTableEvent -> {
+ // We have no way to ensure if a
TruncateTableEvent has been applied
+ // before. Just assume it's not.
+ return false;
+ }));
+ }
+
public final boolean schemaExists(
Map<TableId, SortedMap<Integer, Schema>> schemaMap, TableId
tableId) {
return schemaMap.containsKey(tableId) &&
!schemaMap.get(tableId).isEmpty();
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 9087ae4b3..8ea3a1f93 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -29,8 +29,6 @@ import
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaReque
import
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
import
org.apache.flink.cdc.runtime.operators.schema.event.GetOriginalSchemaRequest;
import
org.apache.flink.cdc.runtime.operators.schema.event.GetOriginalSchemaResponse;
-import
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
import
org.apache.flink.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
@@ -201,18 +199,14 @@ public class SchemaRegistry implements
OperatorCoordinator, CoordinationRequestH
if (request instanceof SchemaChangeRequest) {
SchemaChangeRequest schemaChangeRequest =
(SchemaChangeRequest) request;
return
requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
- } else if (request instanceof ReleaseUpstreamRequest) {
- return requestHandler.handleReleaseUpstreamRequest();
+ } else if (request instanceof SchemaChangeResultRequest) {
+ return requestHandler.getSchemaChangeResult();
} else if (request instanceof GetEvolvedSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
} else if (request instanceof GetOriginalSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
- } else if (request instanceof SchemaChangeResultRequest) {
- return requestHandler.getSchemaChangeResult();
- } else if (request instanceof RefreshPendingListsRequest) {
- return requestHandler.refreshPendingLists();
} else {
throw new IllegalArgumentException(
"Unrecognized CoordinationRequest type: " + request);
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index a84ee8d63..99019a6b4 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -33,41 +33,36 @@ import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
-import
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
-import
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
+import org.apache.flink.cdc.common.utils.Preconditions;
import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
+import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.NotThreadSafe;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.RequestStatus.RECEIVED_RELEASE_REQUEST;
import static
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
/** A handler to deal with all requests and events for {@link SchemaRegistry}.
*/
@Internal
-@NotThreadSafe
public class SchemaRegistryRequestHandler implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(SchemaRegistryRequestHandler.class);
@@ -81,21 +76,18 @@ public class SchemaRegistryRequestHandler implements
Closeable {
private final SchemaDerivation schemaDerivation;
/**
- * Not applied SchemaChangeRequest before receiving all flush success
events for its table from
- * sink writers.
+ * Atomic flag indicating if current RequestHandler could accept more
schema changes for now.
*/
- private final List<PendingSchemaChange> pendingSchemaChanges;
+ private final AtomicReference<RequestStatus> schemaChangeStatus;
- private final List<SchemaChangeEvent> finishedSchemaChanges;
- private final List<SchemaChangeEvent> ignoredSchemaChanges;
+ private volatile Throwable currentChangeException;
+ private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents;
+ private volatile List<SchemaChangeEvent> currentFinishedSchemaChanges;
+ private volatile List<SchemaChangeEvent> currentIgnoredSchemaChanges;
/** Sink writers which have sent flush success events for the request. */
private final Set<Integer> flushedSinkWriters;
- /** Status of the execution of current schema change request. */
- private volatile boolean isSchemaChangeApplying;
- /** Actual exception if failed to apply schema change. */
- private volatile Throwable schemaChangeException;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;
@@ -107,16 +99,81 @@ public class SchemaRegistryRequestHandler implements
Closeable {
SchemaDerivation schemaDerivation,
SchemaChangeBehavior schemaChangeBehavior) {
this.metadataApplier = metadataApplier;
- this.activeSinkWriters = new HashSet<>();
- this.flushedSinkWriters = new HashSet<>();
- this.pendingSchemaChanges = new LinkedList<>();
- this.finishedSchemaChanges = new LinkedList<>();
- this.ignoredSchemaChanges = new LinkedList<>();
this.schemaManager = schemaManager;
this.schemaDerivation = schemaDerivation;
- this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
- this.isSchemaChangeApplying = false;
this.schemaChangeBehavior = schemaChangeBehavior;
+
+ this.activeSinkWriters = new HashSet<>();
+ this.flushedSinkWriters = new HashSet<>();
+ this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
+
+ this.currentDerivedSchemaChangeEvents = new ArrayList<>();
+ this.currentFinishedSchemaChanges = new ArrayList<>();
+ this.currentIgnoredSchemaChanges = new ArrayList<>();
+ this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE);
+ }
+
+ /**
+ * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks
flushing.
+ *
+ * @param request the received SchemaChangeRequest
+ */
+ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
+ SchemaChangeRequest request) {
+ if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE,
RequestStatus.WAITING_FOR_FLUSH)) {
+ LOG.info(
+ "Received schema change event request {} from table {}.
Start to buffer requests for others.",
+ request.getSchemaChangeEvent(),
+ request.getTableId().toString());
+ SchemaChangeEvent event = request.getSchemaChangeEvent();
+
+ // If this schema change event has been requested by another
subTask, ignore it.
+ if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
+ LOG.info("Event {} has been addressed before, ignoring it.",
event);
+ clearCurrentSchemaChangeRequest();
+ Preconditions.checkState(
+ schemaChangeStatus.compareAndSet(
+ RequestStatus.WAITING_FOR_FLUSH,
RequestStatus.IDLE),
+ "Illegal schemaChangeStatus state: should still in
WAITING_FOR_FLUSH state if event was duplicated.");
+ return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
+ }
+ schemaManager.applyOriginalSchemaChange(event);
+ List<SchemaChangeEvent> derivedSchemaChangeEvents =
+
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
+
+ // If this schema change event is filtered out by LENIENT mode or
merging table route
+ // strategies, ignore it.
+ if (derivedSchemaChangeEvents.isEmpty()) {
+ LOG.info("Event {} is omitted from sending to downstream,
ignoring it.", event);
+ clearCurrentSchemaChangeRequest();
+ Preconditions.checkState(
+ schemaChangeStatus.compareAndSet(
+ RequestStatus.WAITING_FOR_FLUSH,
RequestStatus.IDLE),
+ "Illegal schemaChangeStatus state: should still in
WAITING_FOR_FLUSH state if event was ignored.");
+ return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+ }
+
+ // Backfill pre-schema info for sink applying
+ derivedSchemaChangeEvents.forEach(
+ e -> {
+ if (e instanceof SchemaChangeEventWithPreSchema) {
+ SchemaChangeEventWithPreSchema pe =
(SchemaChangeEventWithPreSchema) e;
+ if (!pe.hasPreSchema()) {
+ schemaManager
+ .getLatestEvolvedSchema(pe.tableId())
+ .ifPresent(pe::fillPreSchema);
+ }
+ }
+ });
+ currentDerivedSchemaChangeEvents = new
ArrayList<>(derivedSchemaChangeEvents);
+ return CompletableFuture.completedFuture(
+
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
+ } else {
+ LOG.info(
+ "Schema Registry is busy processing a schema change
request, could not handle request {} for now.",
+ request);
+ return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+ }
}
/**
@@ -127,27 +184,22 @@ public class SchemaRegistryRequestHandler implements
Closeable {
*/
private void applySchemaChange(
TableId tableId, List<SchemaChangeEvent>
derivedSchemaChangeEvents) {
- isSchemaChangeApplying = true;
- schemaChangeException = null;
- finishedSchemaChanges.clear();
- ignoredSchemaChanges.clear();
-
for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {
if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
- ignoredSchemaChanges.add(changeEvent);
+ currentIgnoredSchemaChanges.add(changeEvent);
continue;
}
}
if
(!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) {
LOG.info("Ignored schema change {} to table {}.", changeEvent,
tableId);
- ignoredSchemaChanges.add(changeEvent);
+ currentIgnoredSchemaChanges.add(changeEvent);
} else {
try {
metadataApplier.applySchemaChange(changeEvent);
- LOG.debug("Applied schema change {} to table {}.",
changeEvent, tableId);
+ LOG.info("Applied schema change {} to table {}.",
changeEvent, tableId);
schemaManager.applyEvolvedSchemaChange(changeEvent);
- finishedSchemaChanges.add(changeEvent);
+ currentFinishedSchemaChanges.add(changeEvent);
} catch (Throwable t) {
LOG.error(
"Failed to apply schema change {} to table {}.
Caused by: {}",
@@ -155,7 +207,7 @@ public class SchemaRegistryRequestHandler implements
Closeable {
tableId,
t);
if (!shouldIgnoreException(t)) {
- schemaChangeException = t;
+ currentChangeException = t;
break;
} else {
LOG.warn(
@@ -166,74 +218,9 @@ public class SchemaRegistryRequestHandler implements
Closeable {
}
}
}
-
- PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
- if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
- startNextSchemaChangeRequest();
- }
- isSchemaChangeApplying = false;
- }
-
- /**
- * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks
flushing.
- *
- * @param request the received SchemaChangeRequest
- */
- public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
- SchemaChangeRequest request) {
- if (pendingSchemaChanges.isEmpty()) {
- LOG.info(
- "Received schema change event request from table {}. Start
to buffer requests for others.",
- request.getTableId().toString());
- SchemaChangeEvent event = request.getSchemaChangeEvent();
- if (event instanceof CreateTableEvent
- &&
schemaManager.originalSchemaExists(request.getTableId())) {
- return CompletableFuture.completedFuture(
- wrap(new
SchemaChangeResponse(Collections.emptyList())));
- }
- schemaManager.applyOriginalSchemaChange(event);
- List<SchemaChangeEvent> derivedSchemaChangeEvents =
-
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
- derivedSchemaChangeEvents.forEach(
- e -> {
- if (e instanceof SchemaChangeEventWithPreSchema) {
- SchemaChangeEventWithPreSchema pe =
(SchemaChangeEventWithPreSchema) e;
- if (!pe.hasPreSchema()) {
- schemaManager
- .getLatestEvolvedSchema(pe.tableId())
- .ifPresent(pe::fillPreSchema);
- }
- }
- });
- CompletableFuture<CoordinationResponse> response =
- CompletableFuture.completedFuture(
- wrap(new
SchemaChangeResponse(derivedSchemaChangeEvents)));
- if (!derivedSchemaChangeEvents.isEmpty()) {
- PendingSchemaChange pendingSchemaChange =
- new PendingSchemaChange(request, response);
- pendingSchemaChange.derivedSchemaChangeEvents =
derivedSchemaChangeEvents;
- pendingSchemaChanges.add(pendingSchemaChange);
- pendingSchemaChanges.get(0).startToWaitForReleaseRequest();
- }
- return response;
- } else {
- LOG.info("There are already processing requests. Wait for
processing.");
- CompletableFuture<CoordinationResponse> response = new
CompletableFuture<>();
- pendingSchemaChanges.add(new PendingSchemaChange(request,
response));
- return response;
- }
- }
-
- /** Handle the {@link ReleaseUpstreamRequest} and wait for all sink
subtasks flushing. */
- public CompletableFuture<CoordinationResponse>
handleReleaseUpstreamRequest() {
- CompletableFuture<CoordinationResponse> response =
- pendingSchemaChanges.get(0).getResponseFuture();
- if (response.isDone() && !isSchemaChangeApplying) {
- startNextSchemaChangeRequest();
- } else {
- pendingSchemaChanges.get(0).receiveReleaseRequest();
- }
- return response;
+ Preconditions.checkState(
+ schemaChangeStatus.compareAndSet(RequestStatus.APPLYING,
RequestStatus.FINISHED),
+ "Illegal schemaChangeStatus state: should be APPLYING before
applySchemaChange finishes");
}
/**
@@ -252,76 +239,33 @@ public class SchemaRegistryRequestHandler implements
Closeable {
* @param tableId the subtask in SchemaOperator and table that the
FlushEvent is about
* @param sinkSubtask the sink subtask succeed flushing
*/
- public void flushSuccess(TableId tableId, int sinkSubtask) throws
InterruptedException {
+ public void flushSuccess(TableId tableId, int sinkSubtask) {
flushedSinkWriters.add(sinkSubtask);
if (flushedSinkWriters.equals(activeSinkWriters)) {
+ Preconditions.checkState(
+ schemaChangeStatus.compareAndSet(
+ RequestStatus.WAITING_FOR_FLUSH,
RequestStatus.APPLYING),
+ "Illegal schemaChangeStatus state: should be
WAITING_FOR_FLUSH before collecting enough FlushEvents");
LOG.info(
"All sink subtask have flushed for table {}. Start to
apply schema change.",
tableId.toString());
- PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
schemaChangeThreadPool.submit(
- () -> applySchemaChange(tableId,
waitFlushSuccess.derivedSchemaChangeEvents));
- Thread.sleep(1000);
-
- if (schemaChangeException != null) {
- throw new RuntimeException("Failed to apply schema change.",
schemaChangeException);
- }
-
- if (isSchemaChangeApplying) {
- waitFlushSuccess
- .getResponseFuture()
- .complete(wrap(new SchemaChangeProcessingResponse()));
- } else {
- waitFlushSuccess
- .getResponseFuture()
- .complete(wrap(new
ReleaseUpstreamResponse(finishedSchemaChanges)));
- }
+ () -> applySchemaChange(tableId,
currentDerivedSchemaChangeEvents));
}
}
- private void startNextSchemaChangeRequest() {
- pendingSchemaChanges.remove(0);
- flushedSinkWriters.clear();
- while (!pendingSchemaChanges.isEmpty()) {
- PendingSchemaChange pendingSchemaChange =
pendingSchemaChanges.get(0);
- SchemaChangeRequest request = pendingSchemaChange.changeRequest;
- if (request.getSchemaChangeEvent() instanceof CreateTableEvent
- &&
schemaManager.evolvedSchemaExists(request.getTableId())) {
- pendingSchemaChange
- .getResponseFuture()
- .complete(wrap(new
SchemaChangeResponse(Collections.emptyList())));
- pendingSchemaChanges.remove(0);
- } else {
- List<SchemaChangeEvent> derivedSchemaChangeEvents =
-
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
- pendingSchemaChange
- .getResponseFuture()
- .complete(wrap(new
SchemaChangeResponse(derivedSchemaChangeEvents)));
- if (!derivedSchemaChangeEvents.isEmpty()) {
- pendingSchemaChange.derivedSchemaChangeEvents =
derivedSchemaChangeEvents;
- pendingSchemaChange.startToWaitForReleaseRequest();
- break;
- }
- }
- }
- }
-
- public CompletableFuture<CoordinationResponse> refreshPendingLists() {
- pendingSchemaChanges.clear();
- flushedSinkWriters.clear();
- return CompletableFuture.completedFuture(wrap(new
RefreshPendingListsResponse()));
- }
-
public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
- if (schemaChangeException != null) {
- throw new RuntimeException("Failed to apply schema change.",
schemaChangeException);
- }
-
- if (isSchemaChangeApplying) {
- return CompletableFuture.supplyAsync(() -> wrap(new
SchemaChangeProcessingResponse()));
- } else {
+ Preconditions.checkState(
+ !schemaChangeStatus.get().equals(RequestStatus.IDLE),
+ "Illegal schemaChangeStatus: should not be IDLE before getting
schema change request results.");
+ if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED,
RequestStatus.IDLE)) {
+ // This request has been finished, return it and prepare for the
next request
+ List<SchemaChangeEvent> finishedEvents =
clearCurrentSchemaChangeRequest();
return CompletableFuture.supplyAsync(
- () -> wrap(new
ReleaseUpstreamResponse(finishedSchemaChanges)));
+ () -> wrap(new
SchemaChangeResultResponse(finishedEvents)));
+ } else {
+ // Still working on schema change request, waiting it
+ return CompletableFuture.supplyAsync(() -> wrap(new
SchemaChangeProcessingResponse()));
}
}
@@ -445,57 +389,54 @@ public class SchemaRegistryRequestHandler implements
Closeable {
}
private boolean shouldIgnoreException(Throwable throwable) {
-
// In IGNORE mode, will never try to apply schema change events
- // In EVOLVE and and LENIENT mode, such failure will not be tolerated
+ // In EVOLVE and LENIENT mode, such failure will not be tolerated
// In EXCEPTION mode, an exception will be thrown once captured
return (throwable instanceof UnsupportedSchemaChangeEventException)
&& (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE);
}
- private static class PendingSchemaChange {
- private final SchemaChangeRequest changeRequest;
- private List<SchemaChangeEvent> derivedSchemaChangeEvents;
- private CompletableFuture<CoordinationResponse> responseFuture;
- private RequestStatus status;
-
- public PendingSchemaChange(
- SchemaChangeRequest changeRequest,
- CompletableFuture<CoordinationResponse> responseFuture) {
- this.changeRequest = changeRequest;
- this.responseFuture = responseFuture;
- this.status = RequestStatus.PENDING;
- }
-
- public SchemaChangeRequest getChangeRequest() {
- return changeRequest;
- }
-
- public CompletableFuture<CoordinationResponse> getResponseFuture() {
- return responseFuture;
- }
-
- public RequestStatus getStatus() {
- return status;
- }
-
- public void startToWaitForReleaseRequest() {
- if (!responseFuture.isDone()) {
- throw new IllegalStateException(
- "Cannot start to wait for flush success before the
SchemaChangeRequest is done.");
- }
- this.responseFuture = new CompletableFuture<>();
- this.status = RequestStatus.WAIT_RELEASE_REQUEST;
- }
-
- public void receiveReleaseRequest() {
- this.status = RECEIVED_RELEASE_REQUEST;
+ private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
+ if (currentChangeException != null) {
+ throw new RuntimeException("Failed to apply schema change.",
currentChangeException);
}
+ List<SchemaChangeEvent> finishedSchemaChanges =
+ new ArrayList<>(currentFinishedSchemaChanges);
+ flushedSinkWriters.clear();
+ currentDerivedSchemaChangeEvents.clear();
+ currentFinishedSchemaChanges.clear();
+ currentIgnoredSchemaChanges.clear();
+ currentChangeException = null;
+ return finishedSchemaChanges;
}
- enum RequestStatus {
- PENDING,
- WAIT_RELEASE_REQUEST,
- RECEIVED_RELEASE_REQUEST
+ // Schema change event state could transfer in the following way:
+ //
+ // -------- B --------
+ // | |
+ // v |
+ // -------- ---------------------
+ // | IDLE | --- A --> | WAITING_FOR_FLUSH |
+ // -------- ---------------------
+ // ^ |
+ // E C
+ // \ v
+ // ------------ ------------
+ // | FINISHED | <-- D -- | APPLYING |
+ // ------------ ------------
+ //
+ // A: When a request came to an idling request handler.
+ // B: When current request is duplicate or ignored by LENIENT / routed
table merging
+ // strategies.
+ // C: When schema registry collected enough flush success events, and
actually started to apply
+ // schema changes.
+ // D: When schema change application finishes (successfully or with
exceptions)
+ // E: When current schema change request result has been retrieved by
SchemaOperator, and ready
+ // for the next request.
+ private enum RequestStatus {
+ IDLE,
+ WAITING_FOR_FLUSH,
+ APPLYING,
+ FINISHED
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java
deleted file mode 100644
index a0496c935..000000000
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler;
-import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
-
-/** Request to refresh the pendingSchemaChanges of {@link
SchemaRegistryRequestHandler}. */
-public class RefreshPendingListsRequest implements CoordinationRequest {
-
- private static final long serialVersionUID = 1L;
-}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java
deleted file mode 100644
index ff0221deb..000000000
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
-
-/** Response to refresh the pendingSchemaChanges of {@link
RefreshPendingListsRequest}. */
-public class RefreshPendingListsResponse implements CoordinationResponse {
-
- private static final long serialVersionUID = 1L;
-}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java
deleted file mode 100644
index 3da85c184..000000000
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.cdc.common.event.FlushEvent;
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
-
-/**
- * The request from {@link SchemaOperator} to {@link SchemaRegistry} to
request to release upstream
- * after sending {@link FlushEvent}.
- */
-public class ReleaseUpstreamRequest implements CoordinationRequest {
-
- private static final long serialVersionUID = 1L;
-}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
index b679ab9e5..bf8d84f35 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
@@ -23,8 +23,8 @@ import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
/**
- * The response for {@link SchemaChangeResultRequest} or {@link
ReleaseUpstreamRequest} from {@link
- * SchemaRegistry} to {@link SchemaOperator} if not apply {@link
SchemaChangeEvent} in time.
+ * The response for {@link SchemaChangeResultRequest} from {@link
SchemaRegistry} to {@link
+ * SchemaOperator} if not apply {@link SchemaChangeEvent} in time.
*/
public class SchemaChangeProcessingResponse implements CoordinationResponse {
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
index 142de431e..63d57139b 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
@@ -22,6 +22,7 @@ import
org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -38,8 +39,44 @@ public class SchemaChangeResponse implements
CoordinationResponse {
*/
private final List<SchemaChangeEvent> schemaChangeEvents;
- public SchemaChangeResponse(List<SchemaChangeEvent> schemaChangeEvents) {
+ private final ResponseCode responseCode;
+
+ public static SchemaChangeResponse accepted(List<SchemaChangeEvent>
schemaChangeEvents) {
+ return new SchemaChangeResponse(schemaChangeEvents,
ResponseCode.ACCEPTED);
+ }
+
+ public static SchemaChangeResponse busy() {
+ return new SchemaChangeResponse(Collections.emptyList(),
ResponseCode.BUSY);
+ }
+
+ public static SchemaChangeResponse duplicate() {
+ return new SchemaChangeResponse(Collections.emptyList(),
ResponseCode.DUPLICATE);
+ }
+
+ public static SchemaChangeResponse ignored() {
+ return new SchemaChangeResponse(Collections.emptyList(),
ResponseCode.IGNORED);
+ }
+
+ private SchemaChangeResponse(
+ List<SchemaChangeEvent> schemaChangeEvents, ResponseCode
responseCode) {
this.schemaChangeEvents = schemaChangeEvents;
+ this.responseCode = responseCode;
+ }
+
+ public boolean isAccepted() {
+ return ResponseCode.ACCEPTED.equals(responseCode);
+ }
+
+ public boolean isRegistryBusy() {
+ return ResponseCode.BUSY.equals(responseCode);
+ }
+
+ public boolean isDuplicate() {
+ return ResponseCode.DUPLICATE.equals(responseCode);
+ }
+
+ public boolean isIgnored() {
+ return ResponseCode.IGNORED.equals(responseCode);
}
public List<SchemaChangeEvent> getSchemaChangeEvents() {
@@ -55,11 +92,43 @@ public class SchemaChangeResponse implements
CoordinationResponse {
return false;
}
SchemaChangeResponse response = (SchemaChangeResponse) o;
- return schemaChangeEvents.equals(response.schemaChangeEvents);
+ return Objects.equals(schemaChangeEvents, response.schemaChangeEvents)
+ && responseCode == response.responseCode;
}
@Override
public int hashCode() {
- return Objects.hash(schemaChangeEvents);
+ return Objects.hash(schemaChangeEvents, responseCode);
+ }
+
+ @Override
+ public String toString() {
+ return "SchemaChangeResponse{"
+ + "schemaChangeEvents="
+ + schemaChangeEvents
+ + ", responseCode="
+ + responseCode
+ + '}';
+ }
+
+ /**
+ * Schema Change Response status code.
+ *
+ * <p>- Accepted: Requested schema change request has been accepted
exclusively. Any other
+ * schema change requests will be blocked.
+ *
+ * <p>- Busy: Schema registry is currently busy processing another schema
change request.
+ *
+ * <p>- Duplicate: This schema change request has been submitted before,
possibly by another
+ * paralleled subTask.
+ *
+ * <p>- Ignored: This schema change request has been assessed, but no
actual evolution is
+ * required. Possibly caused by LENIENT mode or merging table strategies.
+ */
+ public enum ResponseCode {
+ ACCEPTED,
+ BUSY,
+ DUPLICATE,
+ IGNORED
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultResponse.java
similarity index 87%
rename from
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
rename to
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultResponse.java
index bea880962..7039ef086 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultResponse.java
@@ -26,10 +26,10 @@ import java.util.List;
import java.util.Objects;
/**
- * The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry}
to {@link
+ * The response for {@link SchemaChangeResultRequest} from {@link
SchemaRegistry} to {@link
* SchemaOperator}.
*/
-public class ReleaseUpstreamResponse implements CoordinationResponse {
+public class SchemaChangeResultResponse implements CoordinationResponse {
private static final long serialVersionUID = 1L;
@@ -39,7 +39,7 @@ public class ReleaseUpstreamResponse implements
CoordinationResponse {
*/
private final List<SchemaChangeEvent> finishedSchemaChangeEvents;
- public ReleaseUpstreamResponse(List<SchemaChangeEvent>
finishedSchemaChangeEvents) {
+ public SchemaChangeResultResponse(List<SchemaChangeEvent>
finishedSchemaChangeEvents) {
this.finishedSchemaChangeEvents = finishedSchemaChangeEvents;
}
@@ -63,7 +63,7 @@ public class ReleaseUpstreamResponse implements
CoordinationResponse {
if (object == null || getClass() != object.getClass()) {
return false;
}
- ReleaseUpstreamResponse that = (ReleaseUpstreamResponse) object;
+ SchemaChangeResultResponse that = (SchemaChangeResultResponse) object;
return Objects.equals(finishedSchemaChangeEvents,
that.finishedSchemaChangeEvents);
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index 354304599..60952b424 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -163,7 +163,6 @@ public class EventOperatorTestHarness<OP extends
AbstractStreamOperator<E>, E ex
}
public void registerTableSchema(TableId tableId, Schema schema) {
- schemaRegistry.handleApplyOriginalSchemaChangeEvent(new
CreateTableEvent(tableId, schema));
schemaRegistry.handleCoordinationRequest(
new SchemaChangeRequest(tableId, new CreateTableEvent(tableId,
schema)));
schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new
CreateTableEvent(tableId, schema));