lvyanquan commented on code in PR #3812:
URL: https://github.com/apache/flink-cdc/pull/3812#discussion_r1976998390
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##########
@@ -45,6 +45,12 @@ public class PipelineOptions {
.defaultValue(1)
.withDescription("Parallelism of the pipeline");
+ public static final ConfigOption<RunTimeMode> PIPELINE_RUNTIME_MODE =
+ ConfigOptions.key("runtime-mode")
Review Comment:
What about making this Boolean type like `batch-mode.enabled` or
`snapshot-only.enabled` to simplify user config?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkFunctionOperator.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.sink;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * An operator that processes records to be written into a {@link
SinkFunction} in batch mode.
+ *
+ * <p>The operator is a proxy of {@link StreamSink} in Flink.
+ *
+ * <p>The operator is always part of a sink pipeline and is the first operator.
+ */
+@Internal
+public class DataBatchSinkFunctionOperator extends StreamSink<Event> {
+
+ /** A set of {@link TableId} that already processed {@link
CreateTableEvent}. */
+ private final Set<TableId> processedTableIds;
+
+ private transient volatile Map<TableId, Schema> originalSchemaMap;
+
+ public DataBatchSinkFunctionOperator(
+ SinkFunction<Event> userFunction, OperatorID schemaOperatorID) {
+ super(userFunction);
+ processedTableIds = new HashSet<>();
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<Object>> output) {
+ super.setup(containingTask, config, output);
+ originalSchemaMap = new HashMap<>();
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Event> element) throws Exception {
+ Event event = element.getValue();
+
+ // FlushEvent triggers flush
+ if (event instanceof FlushEvent) {
+ return;
+ }
+
+ // CreateTableEvent marks the table as processed directly
+ if (event instanceof CreateTableEvent) {
+ CreateTableEvent createTableEvent = (CreateTableEvent) event;
+ processedTableIds.add(createTableEvent.tableId());
+ originalSchemaMap.put(createTableEvent.tableId(),
createTableEvent.getSchema());
+ super.processElement(element);
+ return;
+ }
+
+ // Check if the table is processed before emitting all other events,
because we have to make
+ // sure that sink have a view of the full schema before processing any
change events,
+ // including schema changes.
+ ChangeEvent changeEvent = (ChangeEvent) event;
+ if (!processedTableIds.contains(changeEvent.tableId())) {
Review Comment:
We don‘t need to consider the case of task failover in batch mode as it
should re-emit from Source.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionBatchOperator.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.partitioning;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
+import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Operator for processing events from {@link SchemaOperator} before {@link
EventPartitioner} with
+ * regular topology in batch mode.
+ */
+@Internal
+public class RegularPrePartitionBatchOperator extends
AbstractStreamOperator<PartitioningEvent>
+ implements OneInputStreamOperator<Event, PartitioningEvent>,
Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
+
+ private final int downstreamParallelism;
+ private final HashFunctionProvider<DataChangeEvent> hashFunctionProvider;
+
+ private transient LoadingCache<TableId, HashFunction<DataChangeEvent>>
cachedHashFunctions;
Review Comment:
We can simply use Map here as the job will finish.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -69,27 +71,44 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
private Set<TableId> alreadySendCreateTableTables;
// Used when startup mode is not initial
+ private boolean isBatchMode = false;
private boolean alreadySendCreateTableForBinlogSplit = false;
private List<CreateTableEvent> createTableEventCache;
+ private boolean alreadySendAllCreateTable = false;
public MySqlPipelineRecordEmitter(
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
- MySqlSourceConfig sourceConfig) {
+ MySqlSourceConfig sourceConfig,
+ boolean isBatchMode) {
super(
debeziumDeserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges());
this.sourceConfig = sourceConfig;
this.alreadySendCreateTableTables = new HashSet<>();
this.createTableEventCache = generateCreateTableEvent(sourceConfig);
+ this.isBatchMode = isBatchMode;
}
@Override
protected void processElement(
SourceRecord element, SourceOutput<Event> output, MySqlSplitState
splitState)
throws Exception {
- if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState())
{
+ if (isBatchMode
+ &&
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions())
Review Comment:
I intend not to pass `isBatchMode` to source again, StartupOptions.snapshot
is enough to make us confirm that all CreateTableEvents can be emitted.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaBatchOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.regular;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.CreateTableCompletedEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
+import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The operator will apply create table event and router mapper in batch
mode. */
+@Internal
+public class SchemaBatchOperator extends AbstractStreamOperator<Event>
+ implements OneInputStreamOperator<Event, Event>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaBatchOperator.class);
+
+ // Final fields that are set in constructor
+ private final String timezone;
+ private final List<RouteRule> routingRules;
+
+ // Transient fields that are set during open()
+ private transient volatile Map<TableId, Schema> originalSchemaMap;
+ private transient volatile Map<TableId, Schema> evolvedSchemaMap;
+ private transient TableIdRouter router;
+ private transient SchemaDerivator derivator;
+ protected transient SchemaManager schemaManager;
+ protected MetadataApplier metadataApplier;
+ private boolean alreadyMergedCreateTableTables = false;
+
+ public SchemaBatchOperator(
+ List<RouteRule> routingRules, MetadataApplier metadataApplier,
String timezone) {
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ this.timezone = timezone;
+ this.routingRules = routingRules;
+ this.metadataApplier = metadataApplier;
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<Event>> output) {
+ super.setup(containingTask, config, output);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.originalSchemaMap = new HashMap<>();
+ this.evolvedSchemaMap = new HashMap<>();
+ this.router = new TableIdRouter(routingRules);
+ this.derivator = new SchemaDerivator();
+ this.schemaManager = new SchemaManager(SchemaChangeBehavior.IGNORE);
+ }
+
+ /**
+ * This method is guaranteed to not be called concurrently with other
methods of the operator.
+ */
+ @Override
+ public void processElement(StreamRecord<Event> streamRecord) throws
Exception {
+ Event event = streamRecord.getValue();
+ // System.out.println("Print Event:" + event.toString());
+ // Only catch create table event and data change event in batch mode
+ if (event instanceof CreateTableEvent) {
+ handleCreateTableEvent((CreateTableEvent) event);
+ } else if (event instanceof CreateTableCompletedEvent) {
Review Comment:
I don't think we need to add an additional event.
Instead, when we receive a DataChangeEvent for the first time, we can assume
that all CreateTableEvents have been sent.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkWriterOperator.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.sink;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * An operator that processes records to be written into a {@link Sink} in
batch mode.
+ *
+ * <p>The operator is a proxy of SinkWriterOperator in Flink.
+ *
+ * <p>The operator is always part of a sink pipeline and is the first operator.
+ *
+ * @param <CommT> the type of the committable (to send to downstream operators)
+ */
+@Internal
+public class DataBatchSinkWriterOperator<CommT>
+ extends AbstractStreamOperator<CommittableMessage<CommT>>
+ implements OneInputStreamOperator<Event, CommittableMessage<CommT>>,
BoundedOneInput {
+
+ private final Sink<Event> sink;
+
+ private final ProcessingTimeService processingTimeService;
+
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Operator that actually execute sink logic. */
+ private Object flinkWriterOperator;
+
+ /**
+ * The internal {@link SinkWriter} of flinkWriterOperator, obtained it
through reflection to
+ * deal with {@link FlushEvent}.
+ */
+ private SinkWriter<Event> copySinkWriter;
+
+ /** A set of {@link TableId} that already processed {@link
CreateTableEvent}. */
+ private final Set<TableId> processedTableIds;
+
+ private transient volatile Map<TableId, Schema> originalSchemaMap;
+
+ public DataBatchSinkWriterOperator(
+ Sink<Event> sink,
+ ProcessingTimeService processingTimeService,
+ MailboxExecutor mailboxExecutor) {
+ this.sink = sink;
+ this.processingTimeService = processingTimeService;
+ this.mailboxExecutor = mailboxExecutor;
+ this.processedTableIds = new HashSet<>();
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<CommittableMessage<CommT>>> output) {
+ super.setup(containingTask, config, output);
+ flinkWriterOperator = createFlinkWriterOperator();
+
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
+ .setup(containingTask, config, output);
+ originalSchemaMap = new HashMap<>();
+ }
+
+ @Override
+ public void open() throws Exception {
+
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator().open();
+ copySinkWriter = getFieldValue("sinkWriter");
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
+ .initializeState(context);
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
+ .snapshotState(context);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
+ .processWatermark(mark);
+ }
+
+ @Override
+ public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws
Exception {
+ super.processWatermarkStatus(watermarkStatus);
+
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
+ .processWatermarkStatus(watermarkStatus);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Event> element) throws Exception {
+ Event event = element.getValue();
+
+ // FlushEvent triggers flush
+ if (event instanceof FlushEvent) {
+ handleFlushEvent(((FlushEvent) event));
+ return;
+ }
+
+ // CreateTableEvent marks the table as processed directly
+ if (event instanceof CreateTableEvent) {
+ CreateTableEvent createTableEvent = (CreateTableEvent) event;
+ processedTableIds.add(createTableEvent.tableId());
+ originalSchemaMap.put(createTableEvent.tableId(),
createTableEvent.getSchema());
+ this.<OneInputStreamOperator<Event,
CommittableMessage<CommT>>>getFlinkWriterOperator()
+ .processElement(element);
+ return;
+ }
+
+ // Check if the table is processed before emitting all other events,
because we have to make
+ // sure that sink have a view of the full schema before processing any
change events,
+ // including schema changes.
+ ChangeEvent changeEvent = (ChangeEvent) event;
+ if (!processedTableIds.contains(changeEvent.tableId())) {
Review Comment:
Ditto.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaBatchOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.regular;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.CreateTableCompletedEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
+import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The operator will apply create table event and router mapper in batch
mode. */
+@Internal
+public class SchemaBatchOperator extends AbstractStreamOperator<Event>
+ implements OneInputStreamOperator<Event, Event>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaBatchOperator.class);
+
+ // Final fields that are set in constructor
+ private final String timezone;
+ private final List<RouteRule> routingRules;
+
+ // Transient fields that are set during open()
+ private transient volatile Map<TableId, Schema> originalSchemaMap;
+ private transient volatile Map<TableId, Schema> evolvedSchemaMap;
+ private transient TableIdRouter router;
+ private transient SchemaDerivator derivator;
+ protected transient SchemaManager schemaManager;
+ protected MetadataApplier metadataApplier;
+ private boolean alreadyMergedCreateTableTables = false;
+
+ public SchemaBatchOperator(
+ List<RouteRule> routingRules, MetadataApplier metadataApplier,
String timezone) {
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ this.timezone = timezone;
+ this.routingRules = routingRules;
+ this.metadataApplier = metadataApplier;
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<Event>> output) {
+ super.setup(containingTask, config, output);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.originalSchemaMap = new HashMap<>();
+ this.evolvedSchemaMap = new HashMap<>();
+ this.router = new TableIdRouter(routingRules);
+ this.derivator = new SchemaDerivator();
+ this.schemaManager = new SchemaManager(SchemaChangeBehavior.IGNORE);
+ }
+
+ /**
+ * This method is guaranteed to not be called concurrently with other
methods of the operator.
+ */
+ @Override
+ public void processElement(StreamRecord<Event> streamRecord) throws
Exception {
+ Event event = streamRecord.getValue();
+ // System.out.println("Print Event:" + event.toString());
Review Comment:
Can be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]