This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 0df63e26e [hotfix][cdc-runtime] Keep upstream pending requests in
order to avoid checkpoint hanging & state inconsistency in timestamp startup
mode
0df63e26e is described below
commit 0df63e26e84e7443576a0887cd27613bcf73e830
Author: yuxiqian <[email protected]>
AuthorDate: Mon Aug 26 23:51:28 2024 +0800
[hotfix][cdc-runtime] Keep upstream pending requests in order to avoid
checkpoint hanging & state inconsistency in timestamp startup mode
This closes #3576.
---
.../flink/cdc/pipeline/tests/MysqlE2eITCase.java | 58 +++++++
.../runtime/operators/schema/SchemaOperator.java | 19 ++-
.../coordinator/SchemaRegistryRequestHandler.java | 175 +++++++++++++--------
.../schema/event/SchemaChangeRequest.java | 15 +-
.../runtime/partitioning/PrePartitionOperator.java | 7 +
.../operators/EventOperatorTestHarness.java | 2 +-
6 files changed, 203 insertions(+), 73 deletions(-)
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 29fe5db9d..ccdbbbad7 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -332,6 +332,64 @@ public class MysqlE2eITCase extends
PipelineTestEnvironment {
"DropTableEvent{tableId=%s.products}");
}
+ @Test
+ public void testDroppingTable() throws Exception {
+ Thread.sleep(5000);
+ LOG.info("Sleep 5 seconds to distinguish initial DDL events with
dropping table events...");
+ long ddlTimestamp = System.currentTimeMillis();
+ Thread.sleep(5000);
+ LOG.info("Going to drop tables after timestamp {}", ddlTimestamp);
+
+ try (Connection connection =
mysqlInventoryDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("DROP TABLE products;");
+ }
+
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + " scan.startup.mode: timestamp\n"
+ + " scan.startup.timestamp-millis: %d\n"
+ + " scan.binlog.newly-added-table.enabled:
true\n"
+ + "\n"
+ + "sink:\n"
+ + " type: values\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: %d\n"
+ + " schema.change.behavior: evolve",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ mysqlInventoryDatabase.getDatabaseName(),
+ ddlTimestamp,
+ parallelism);
+ Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar,
mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+ waitUntilSpecificEvent(
+ String.format(
+ "Table %s.products received SchemaChangeEvent
DropTableEvent{tableId=%s.products} and start to be blocked.",
+ mysqlInventoryDatabase.getDatabaseName(),
+ mysqlInventoryDatabase.getDatabaseName()));
+
+ waitUntilSpecificEvent(
+ String.format(
+ "Schema change event
DropTableEvent{tableId=%s.products} has been handled in another subTask
already.",
+ mysqlInventoryDatabase.getDatabaseName()));
+ }
+
private void validateResult(String... expectedEvents) throws Exception {
String dbName = mysqlInventoryDatabase.getDatabaseName();
for (String event : expectedEvents) {
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index 1b4f50e89..58f524a4b 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -23,6 +23,7 @@ import
org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
@@ -50,6 +51,7 @@ import
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -242,7 +244,13 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
tableId,
event);
handleSchemaChangeEvent(tableId, event);
- // Update caches
+
+ if (event instanceof DropTableEvent) {
+ // Update caches unless event is a Drop table event. In that case,
no schema will be
+ // available / necessary.
+ return;
+ }
+
originalSchema.put(tableId, getLatestOriginalSchema(tableId));
schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId));
@@ -440,7 +448,8 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
long schemaEvolveTimeOutMillis = System.currentTimeMillis() +
rpcTimeOutInMillis;
while (true) {
SchemaChangeResponse response =
- sendRequestToCoordinator(new SchemaChangeRequest(tableId,
schemaChangeEvent));
+ sendRequestToCoordinator(
+ new SchemaChangeRequest(tableId,
schemaChangeEvent, subTaskId));
if (response.isRegistryBusy()) {
if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
LOG.info(
@@ -609,4 +618,10 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
}
}
}
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ // Needless to do anything, since AbstractStreamOperator#snapshotState
and #processElement
+ // is guaranteed not to be mixed together.
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index ae765bae2..1310fb6be 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -55,7 +55,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -78,7 +77,10 @@ public class SchemaRegistryRequestHandler implements
Closeable {
/**
* Atomic flag indicating if current RequestHandler could accept more
schema changes for now.
*/
- private final AtomicReference<RequestStatus> schemaChangeStatus;
+ private volatile RequestStatus schemaChangeStatus;
+
+ private final List<Integer> pendingSubTaskIds;
+ private final Object schemaChangeRequestLock;
private volatile Throwable currentChangeException;
private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents;
@@ -110,7 +112,10 @@ public class SchemaRegistryRequestHandler implements
Closeable {
this.currentDerivedSchemaChangeEvents = new ArrayList<>();
this.currentFinishedSchemaChanges = new ArrayList<>();
this.currentIgnoredSchemaChanges = new ArrayList<>();
- this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE);
+
+ this.schemaChangeStatus = RequestStatus.IDLE;
+ this.pendingSubTaskIds = new ArrayList<>();
+ this.schemaChangeRequestLock = new Object();
}
/**
@@ -120,67 +125,100 @@ public class SchemaRegistryRequestHandler implements
Closeable {
*/
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
- if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE,
RequestStatus.WAITING_FOR_FLUSH)) {
- LOG.info(
- "Received schema change event request {} from table {}.
SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will
be blocked.",
- request.getSchemaChangeEvent(),
- request.getTableId().toString());
- SchemaChangeEvent event = request.getSchemaChangeEvent();
-
- // If this schema change event has been requested by another
subTask, ignore it.
- if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
- LOG.info("Event {} has been addressed before, ignoring it.",
event);
- clearCurrentSchemaChangeRequest();
- Preconditions.checkState(
- schemaChangeStatus.compareAndSet(
- RequestStatus.WAITING_FOR_FLUSH,
RequestStatus.IDLE),
- "Illegal schemaChangeStatus state: should still in
WAITING_FOR_FLUSH state if event was duplicated, not "
- + schemaChangeStatus.get());
+
+ // We use requester subTask ID as the pending ticket, because there
will be at most 1 schema
+ // change requests simultaneously from each subTask
+ int requestSubTaskId = request.getSubTaskId();
+
+ synchronized (schemaChangeRequestLock) {
+ // Make sure we handle the first request in the pending list to
avoid out-of-order
+ // waiting and blocks checkpointing mechanism.
+ if (schemaChangeStatus == RequestStatus.IDLE) {
+ if (pendingSubTaskIds.isEmpty()) {
+ LOG.info(
+ "Received schema change event request {} from
table {} from subTask {}. Pending list is empty, handling this.",
+ request.getSchemaChangeEvent(),
+ request.getTableId().toString(),
+ requestSubTaskId);
+ } else if (pendingSubTaskIds.get(0) == requestSubTaskId) {
+ LOG.info(
+ "Received schema change event request {} from
table {} from subTask {}. It is on the first of the pending list, handling
this.",
+ request.getSchemaChangeEvent(),
+ request.getTableId().toString(),
+ requestSubTaskId);
+ pendingSubTaskIds.remove(0);
+ } else {
+ LOG.info(
+ "Received schema change event request {} from
table {} from subTask {}. It is not the first of the pending list ({}).",
+ request.getSchemaChangeEvent(),
+ request.getTableId().toString(),
+ requestSubTaskId,
+ pendingSubTaskIds);
+ if (!pendingSubTaskIds.contains(requestSubTaskId)) {
+ pendingSubTaskIds.add(requestSubTaskId);
+ }
+ return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+ }
+
+ SchemaChangeEvent event = request.getSchemaChangeEvent();
+
+ // If this schema change event has been requested by another
subTask, ignore it.
+ if (schemaManager.isOriginalSchemaChangeEventRedundant(event))
{
+ LOG.info("Event {} has been addressed before, ignoring
it.", event);
+ clearCurrentSchemaChangeRequest();
+ LOG.info(
+ "SchemaChangeStatus switched from
WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
+ request);
+ return CompletableFuture.completedFuture(
+ wrap(SchemaChangeResponse.duplicate()));
+ }
+ schemaManager.applyOriginalSchemaChange(event);
+ List<SchemaChangeEvent> derivedSchemaChangeEvents =
+
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
+
+ // If this schema change event is filtered out by LENIENT mode
or merging table
+ // route strategies, ignore it.
+ if (derivedSchemaChangeEvents.isEmpty()) {
+ LOG.info("Event {} is omitted from sending to downstream,
ignoring it.", event);
+ clearCurrentSchemaChangeRequest();
+ LOG.info(
+ "SchemaChangeStatus switched from
WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
+ request);
+ return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+ }
+
LOG.info(
- "SchemaChangeStatus switched from WAITING_FOR_FLUSH to
IDLE for request {} due to duplicated request.",
- request);
- return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
- }
- schemaManager.applyOriginalSchemaChange(event);
- List<SchemaChangeEvent> derivedSchemaChangeEvents =
-
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
-
- // If this schema change event is filtered out by LENIENT mode or
merging table route
- // strategies, ignore it.
- if (derivedSchemaChangeEvents.isEmpty()) {
- LOG.info("Event {} is omitted from sending to downstream,
ignoring it.", event);
- clearCurrentSchemaChangeRequest();
- Preconditions.checkState(
- schemaChangeStatus.compareAndSet(
- RequestStatus.WAITING_FOR_FLUSH,
RequestStatus.IDLE),
- "Illegal schemaChangeStatus state: should still in
WAITING_FOR_FLUSH state if event was ignored, not "
- + schemaChangeStatus.get());
+ "SchemaChangeStatus switched from IDLE to
WAITING_FOR_FLUSH, other requests will be blocked.");
+ // This request has been accepted.
+ schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
+
+ // Backfill pre-schema info for sink applying
+ derivedSchemaChangeEvents.forEach(
+ e -> {
+ if (e instanceof SchemaChangeEventWithPreSchema) {
+ SchemaChangeEventWithPreSchema pe =
+ (SchemaChangeEventWithPreSchema) e;
+ if (!pe.hasPreSchema()) {
+ schemaManager
+
.getLatestEvolvedSchema(pe.tableId())
+ .ifPresent(pe::fillPreSchema);
+ }
+ }
+ });
+ currentDerivedSchemaChangeEvents = new
ArrayList<>(derivedSchemaChangeEvents);
+ return CompletableFuture.completedFuture(
+
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
+ } else {
LOG.info(
- "SchemaChangeStatus switched from WAITING_FOR_FLUSH to
IDLE for request {} due to ignored request.",
- request);
- return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+ "Schema Registry is busy processing a schema change
request, could not handle request {} for now. Added {} to pending list ({}).",
+ request,
+ requestSubTaskId,
+ pendingSubTaskIds);
+ if (!pendingSubTaskIds.contains(requestSubTaskId)) {
+ pendingSubTaskIds.add(requestSubTaskId);
+ }
+ return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
}
-
- // Backfill pre-schema info for sink applying
- derivedSchemaChangeEvents.forEach(
- e -> {
- if (e instanceof SchemaChangeEventWithPreSchema) {
- SchemaChangeEventWithPreSchema pe =
(SchemaChangeEventWithPreSchema) e;
- if (!pe.hasPreSchema()) {
- schemaManager
- .getLatestEvolvedSchema(pe.tableId())
- .ifPresent(pe::fillPreSchema);
- }
- }
- });
- currentDerivedSchemaChangeEvents = new
ArrayList<>(derivedSchemaChangeEvents);
- return CompletableFuture.completedFuture(
-
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
- } else {
- LOG.info(
- "Schema Registry is busy processing a schema change
request, could not handle request {} for now.",
- request);
- return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
}
}
@@ -227,9 +265,10 @@ public class SchemaRegistryRequestHandler implements
Closeable {
}
}
Preconditions.checkState(
- schemaChangeStatus.compareAndSet(RequestStatus.APPLYING,
RequestStatus.FINISHED),
+ schemaChangeStatus == RequestStatus.APPLYING,
"Illegal schemaChangeStatus state: should be APPLYING before
applySchemaChange finishes, not "
- + schemaChangeStatus.get());
+ + schemaChangeStatus);
+ schemaChangeStatus = RequestStatus.FINISHED;
LOG.info(
"SchemaChangeStatus switched from APPLYING to FINISHED for
request {}.",
currentDerivedSchemaChangeEvents);
@@ -262,10 +301,11 @@ public class SchemaRegistryRequestHandler implements
Closeable {
}
if (flushedSinkWriters.equals(activeSinkWriters)) {
Preconditions.checkState(
- schemaChangeStatus.compareAndSet(
- RequestStatus.WAITING_FOR_FLUSH,
RequestStatus.APPLYING),
+ schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH,
"Illegal schemaChangeStatus state: should be
WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
+ schemaChangeStatus);
+
+ schemaChangeStatus = RequestStatus.APPLYING;
LOG.info(
"All sink subtask have flushed for table {}. Start to
apply schema change.",
tableId.toString());
@@ -276,9 +316,10 @@ public class SchemaRegistryRequestHandler implements
Closeable {
public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
Preconditions.checkState(
- !schemaChangeStatus.get().equals(RequestStatus.IDLE),
+ schemaChangeStatus != RequestStatus.IDLE,
"Illegal schemaChangeStatus: should not be IDLE before getting
schema change request results.");
- if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED,
RequestStatus.IDLE)) {
+ if (schemaChangeStatus == RequestStatus.FINISHED) {
+ schemaChangeStatus = RequestStatus.IDLE;
LOG.info(
"SchemaChangeStatus switched from FINISHED to IDLE for
request {}",
currentDerivedSchemaChangeEvents);
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
index fda6b02f1..fbc5e9c40 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
@@ -36,10 +36,14 @@ public class SchemaChangeRequest implements
CoordinationRequest {
private final TableId tableId;
/** The schema changes. */
private final SchemaChangeEvent schemaChangeEvent;
+ /** The ID of subTask that initiated the request. */
+ private final int subTaskId;
- public SchemaChangeRequest(TableId tableId, SchemaChangeEvent
schemaChangeEvent) {
+ public SchemaChangeRequest(
+ TableId tableId, SchemaChangeEvent schemaChangeEvent, int
subTaskId) {
this.tableId = tableId;
this.schemaChangeEvent = schemaChangeEvent;
+ this.subTaskId = subTaskId;
}
public TableId getTableId() {
@@ -50,6 +54,10 @@ public class SchemaChangeRequest implements
CoordinationRequest {
return schemaChangeEvent;
}
+ public int getSubTaskId() {
+ return subTaskId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -60,11 +68,12 @@ public class SchemaChangeRequest implements
CoordinationRequest {
}
SchemaChangeRequest that = (SchemaChangeRequest) o;
return Objects.equals(tableId, that.tableId)
- && Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
+ && Objects.equals(schemaChangeEvent, that.schemaChangeEvent)
+ && subTaskId == that.subTaskId;
}
@Override
public int hashCode() {
- return Objects.hash(tableId, schemaChangeEvent);
+ return Objects.hash(tableId, schemaChangeEvent, subTaskId);
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
index 1171ad07b..938e6950d 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
@@ -31,6 +31,7 @@ import
org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -147,4 +148,10 @@ public class PrePartitionOperator extends
AbstractStreamOperator<PartitioningEve
}
});
}
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ // Needless to do anything, since AbstractStreamOperator#snapshotState
and #processElement
+ // is guaranteed not to be mixed together.
+ }
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index 60952b424..a469a3a91 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -164,7 +164,7 @@ public class EventOperatorTestHarness<OP extends
AbstractStreamOperator<E>, E ex
public void registerTableSchema(TableId tableId, Schema schema) {
schemaRegistry.handleCoordinationRequest(
- new SchemaChangeRequest(tableId, new CreateTableEvent(tableId,
schema)));
+ new SchemaChangeRequest(tableId, new CreateTableEvent(tableId,
schema), 0));
schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new
CreateTableEvent(tableId, schema));
}