lvyanquan commented on code in PR #3812: URL: https://github.com/apache/flink-cdc/pull/3812#discussion_r1991296352
########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkFunctionOperator.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.sink; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * An operator that processes records to be written into a {@link SinkFunction} in batch mode. + * + * <p>The operator is a proxy of {@link StreamSink} in Flink. + * + * <p>The operator is always part of a sink pipeline and is the first operator. + */ +@Internal +public class DataBatchSinkFunctionOperator extends StreamSink<Event> { + + public DataBatchSinkFunctionOperator(SinkFunction<Event> userFunction) { + super(userFunction); + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void processElement(StreamRecord<Event> element) throws Exception { + Event event = element.getValue(); + + // FlushEvent triggers flush + if (event instanceof FlushEvent) { Review Comment: Actually, there is no FlushEvent here because SchemaBatchOperator will not emit FlushEvent to downstream. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkWriterOperator.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.sink; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; + +/** + * An operator that processes records to be written into a {@link Sink} in batch mode. + * + * <p>The operator is a proxy of SinkWriterOperator in Flink. + * + * <p>The operator is always part of a sink pipeline and is the first operator. + * + * @param <CommT> the type of the committable (to send to downstream operators) + */ +@Internal +public class DataBatchSinkWriterOperator<CommT> + extends AbstractStreamOperator<CommittableMessage<CommT>> + implements OneInputStreamOperator<Event, CommittableMessage<CommT>>, BoundedOneInput { + + private final Sink<Event> sink; + + private final ProcessingTimeService processingTimeService; + + private final MailboxExecutor mailboxExecutor; + + /** Operator that actually execute sink logic. */ + private Object flinkWriterOperator; + + /** + * The internal {@link SinkWriter} of flinkWriterOperator, obtained it through reflection to + * deal with {@link FlushEvent}. + */ + private SinkWriter<Event> copySinkWriter; + + public DataBatchSinkWriterOperator( + Sink<Event> sink, + ProcessingTimeService processingTimeService, + MailboxExecutor mailboxExecutor) { + this.sink = sink; + this.processingTimeService = processingTimeService; + this.mailboxExecutor = mailboxExecutor; + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<CommittableMessage<CommT>>> output) { + super.setup(containingTask, config, output); + flinkWriterOperator = createFlinkWriterOperator(); + this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator() + .setup(containingTask, config, output); + } + + @Override + public void open() throws Exception { + this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator().open(); + copySinkWriter = getFieldValue("sinkWriter"); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator() + .initializeState(context); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator() + .snapshotState(context); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator() + .processWatermark(mark); + } + + @Override + public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { + super.processWatermarkStatus(watermarkStatus); + this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator() + .processWatermarkStatus(watermarkStatus); + } + + @Override + public void processElement(StreamRecord<Event> element) throws Exception { + Event event = element.getValue(); + + // FlushEvent triggers flush + if (event instanceof FlushEvent) { Review Comment: Ditto. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreBatchTransformOperator.java: ########## @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.runtime.parser.TransformParser; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * A data process function that filters out columns which aren't (directly & indirectly) referenced + * in batch mode. There is no need to consider the task failover scenario in batch mode, as it + * should be re - emitted from the source. + */ +public class PreBatchTransformOperator extends AbstractStreamOperator<Event> + implements OneInputStreamOperator<Event, Event>, Serializable { + + private static final long serialVersionUID = 1L; + + private final List<TransformRule> transformRules; + private transient List<PreTransformer> transforms; + private final Map<TableId, PreTransformChangeInfo> preTransformChangeInfoMap; + private final List<Tuple2<Selectors, SchemaMetadataTransform>> schemaMetadataTransformers; + private final List<Tuple3<String, String, Map<String, String>>> udfFunctions; + private List<UserDefinedFunctionDescriptor> udfDescriptors; + private Map<TableId, PreTransformProcessor> preTransformProcessorMap; + private Map<TableId, Boolean> hasAsteriskMap; + + public static PreBatchTransformOperator.Builder newBuilder() { + return new PreBatchTransformOperator.Builder(); + } + + /** Builder of {@link PreBatchTransformOperator}. */ + public static class Builder { + private final List<TransformRule> transformRules = new ArrayList<>(); + + private final List<Tuple3<String, String, Map<String, String>>> udfFunctions = + new ArrayList<>(); + + public PreBatchTransformOperator.Builder addTransform( + String tableInclusions, @Nullable String projection, @Nullable String filter) { + transformRules.add( + new TransformRule( + tableInclusions, + projection, + filter, + "", + "", + "", + null, + new SupportedMetadataColumn[0])); + return this; + } + + public PreBatchTransformOperator.Builder addTransform( + String tableInclusions, + @Nullable String projection, + @Nullable String filter, + String primaryKey, + String partitionKey, + String tableOption, + @Nullable String postTransformConverter, + SupportedMetadataColumn[] supportedMetadataColumns) { + transformRules.add( + new TransformRule( + tableInclusions, + projection, + filter, + primaryKey, + partitionKey, + tableOption, + postTransformConverter, + supportedMetadataColumns)); + return this; + } + + public PreBatchTransformOperator.Builder addUdfFunctions( + List<Tuple3<String, String, Map<String, String>>> udfFunctions) { + this.udfFunctions.addAll(udfFunctions); + return this; + } + + public PreBatchTransformOperator build() { + return new PreBatchTransformOperator(transformRules, udfFunctions); + } + } + + private PreBatchTransformOperator( + List<TransformRule> transformRules, + List<Tuple3<String, String, Map<String, String>>> udfFunctions) { + this.preTransformChangeInfoMap = new ConcurrentHashMap<>(); + this.preTransformProcessorMap = new ConcurrentHashMap<>(); + this.schemaMetadataTransformers = new ArrayList<>(); + this.chainingStrategy = ChainingStrategy.ALWAYS; + + this.transformRules = transformRules; + this.udfFunctions = udfFunctions; + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<Event>> output) { + super.setup(containingTask, config, output); + this.udfDescriptors = + this.udfFunctions.stream() + .map(udf -> new UserDefinedFunctionDescriptor(udf.f0, udf.f1, udf.f2)) + .collect(Collectors.toList()); + + // Initialize data fields in advance because they might be accessed in + // `::initializeState` function when restoring from a previous state. + this.transforms = new ArrayList<>(); + for (TransformRule transformRule : transformRules) { + String tableInclusions = transformRule.getTableInclusions(); + String projection = transformRule.getProjection(); + String filter = transformRule.getFilter(); + String primaryKeys = transformRule.getPrimaryKey(); + String partitionKeys = transformRule.getPartitionKey(); + String tableOptions = transformRule.getTableOption(); + Selectors selectors = + new Selectors.SelectorsBuilder().includeTables(tableInclusions).build(); + transforms.add( + new PreTransformer( + selectors, + TransformProjection.of(projection).orElse(null), + TransformFilter.of(filter, udfDescriptors).orElse(null))); + schemaMetadataTransformers.add( + new Tuple2<>( + selectors, + new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions))); + } + this.preTransformProcessorMap = new ConcurrentHashMap<>(); + this.hasAsteriskMap = new ConcurrentHashMap<>(); + } + + @Override + public void finish() throws Exception { + super.finish(); + clearOperator(); + } + + @Override + public void close() throws Exception { + super.close(); + clearOperator(); + } + + @Override + public void processElement(StreamRecord<Event> element) throws Exception { + Event event = element.getValue(); + if (event instanceof CreateTableEvent) { + CreateTableEvent createTableEvent = (CreateTableEvent) event; + preTransformProcessorMap.remove(createTableEvent.tableId()); + output.collect(new StreamRecord<>(cacheCreateTable(createTableEvent))); + } else if (event instanceof DropTableEvent) { + preTransformProcessorMap.remove(((DropTableEvent) event).tableId()); + output.collect(new StreamRecord<>(event)); + } else if (event instanceof TruncateTableEvent) { + output.collect(new StreamRecord<>(event)); + } else if (event instanceof SchemaChangeEvent) { + SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; + preTransformProcessorMap.remove(schemaChangeEvent.tableId()); + cacheChangeSchema(schemaChangeEvent) + .ifPresent(e -> output.collect(new StreamRecord<>(e))); Review Comment: No DropTableEvent, TruncateTableEvent and other SchemaChangeEvents will be sent, so we can only deal with CreateTableEvent and DataChangeEvent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
