lvyanquan commented on code in PR #3254: URL: https://github.com/apache/flink-cdc/pull/3254#discussion_r1650032276
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SessionCommitCoordinator.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.maxcompute.utils; + +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.connectors.maxcompute.common.Constant; +import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CommitSessionResponse; +import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +/** + * The SessionCommitCoordinator class is responsible for coordinating and controlling the order of + * session submissions by multiple concurrent executors in a distributed processing environment. It + * ensures that: 1. Each executor must submit sessions in ascending order by session ID. 2. Each + * executor must submit a Constant.END_OF_SESSION as a terminator after completing its session + * submissions. + * + * <p>Working Principle: - Maintains an array of queues (toCommitSessionIds), with each queue + * corresponding to an executor, to isolate the session submissions of different executors. This + * maintains the independence and submission order of each executor. - Executors submit session IDs + * sequentially by invoking the commit() method. The commit operation simply enqueues the session ID + * into the corresponding executor's queue. - The getToCommitSessionId() method is tasked with + * selecting the smallest session ID across all executors that has been "submitted" or is "no longer + * required" for submission, allowing for further processing. "Submitted" means that the session ID + * has been submitted by all executors; "no longer required" assumes that any subsequent session IDs + * that are yet to be submitted will always be greater than the currently chosen ID. - Once a + * session ID is selected by the getToCommitSessionId() method, it's removed from all executors' + * queues, indicating that the session ID has been processed. This process ensures ordered + * processing of the sessions and allows the system to efficiently progress. - Each processing step + * of the session IDs is based on a key assumption: that any subsequent session ID submissions will + * always be greater than the current processed session ID. This is guaranteed by the fact that each + * executor commits to submitting session IDs in order and submits a special terminator + * (Constant.END_OF_SESSION) at the end. + * + * <p>Note: - The class presupposes that all session IDs are comparable, and each executor strictly + * adheres to the submission order of session IDs in ascending order. Any behavior that deviates + * from this principle may lead to unpredictable outcomes, as it contravenes the fundamental + * assumption of the class's design. - The introduction of Constant.END_OF_SESSION as a terminator + * is a key aspect of this coordination strategy, as it provides a clear signal for recognizing the + * completion status of an executor, allowing the system to determine whether all relevant sessions + * have been processed. + */ +public class SessionCommitCoordinator { Review Comment: Is it better for us to change it to `Manager` or `Helper` to distinguish it from Flink's `OperatorCoordinator` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.maxcompute.coordinator; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.OperationType; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.connectors.maxcompute.common.Constant; +import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier; +import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionRequest; +import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionResponse; +import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.WaitForFlushSuccessRequest; +import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions; +import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils; +import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils; +import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; +import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.SerializedValue; + +import com.aliyun.odps.PartitionSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Processes a {@link DataChangeEvent}, extracting data and encapsulating it into a {@link + * SessionIdentifier}, and then sends a {@link CreateSessionRequest} to the {@link + * SessionManageCoordinator} to create a writing session. Subsequently, it incorporates the + * SessionId into the metadata of the {@link DataChangeEvent} for downstream processing. + */ +public class SessionManageOperator extends AbstractStreamOperator<Event> + implements OneInputStreamOperator<Event, Event>, OperatorEventHandler, BoundedOneInput { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(SessionManageOperator.class); + + /** a tricky way to get an Operator from sink. */ + public static SessionManageOperator instance; + + private final MaxComputeOptions options; + private final OperatorID schemaOperatorUid; + + private transient TaskOperatorEventGateway taskOperatorEventGateway; + private transient Map<SessionIdentifier, String> sessionCache; + private transient Map<TableId, Schema> schemaMaps; + private transient Map<TableId, List<RecordData.FieldGetter>> fieldGetterMaps; + private transient SchemaEvolutionClient schemaEvolutionClient; + + private transient Future<CoordinationResponse> snapshotFlushSuccess; + private transient int indexOfThisSubtask; + /** + * trigger endOfInput is ahead of prepareSnapshotPreBarrier, so we need this flag to handle when + * endOfInput, send WaitForSuccessRequest in advance. + */ + private transient boolean endOfInput; + + public SessionManageOperator(MaxComputeOptions options, OperatorID schemaOperatorUid) { + this.chainingStrategy = ChainingStrategy.ALWAYS; + this.options = options; + this.schemaOperatorUid = schemaOperatorUid; + } + + @Override + public void open() throws Exception { + this.sessionCache = new HashMap<>(); + this.schemaMaps = new HashMap<>(); + this.fieldGetterMaps = new HashMap<>(); + SessionManageOperator.instance = this; + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<Event>> output) { + super.setup(containingTask, config, output); + schemaEvolutionClient = + new SchemaEvolutionClient( + containingTask.getEnvironment().getOperatorCoordinatorEventGateway(), + schemaOperatorUid); + indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + } + + @Override + public void processElement(StreamRecord<Event> element) throws Exception { + if (element.getValue() instanceof DataChangeEvent) { + DataChangeEvent dataChangeEvent = (DataChangeEvent) element.getValue(); + TableId tableId = dataChangeEvent.tableId(); + // because of this operator is between SchemaOperator and DataSinkWriterOperator, no + // schema will fill when CreateTableEvent is loss. + if (!schemaMaps.containsKey(tableId)) { + emitLatestSchema(tableId); + } + String partitionName = + extractPartition( + dataChangeEvent.op() == OperationType.DELETE + ? dataChangeEvent.before() + : dataChangeEvent.after(), + tableId); + SessionIdentifier sessionIdentifier = + SessionIdentifier.of( + options.getProject(), + MaxComputeUtils.getSchema(options, tableId), + tableId.getTableName(), + partitionName); + if (!sessionCache.containsKey(sessionIdentifier)) { + CreateSessionResponse response = + (CreateSessionResponse) + sendRequestToOperator(new CreateSessionRequest(sessionIdentifier)); + sessionCache.put(sessionIdentifier, response.getSessionId()); + } + dataChangeEvent + .meta() + .put(Constant.TUNNEL_SESSION_ID, sessionCache.get(sessionIdentifier)); + dataChangeEvent.meta().put(Constant.MAXCOMPUTE_PARTITION_NAME, partitionName); + output.collect(new StreamRecord<>(dataChangeEvent)); + } else if (element.getValue() instanceof FlushEvent) { + LOG.info( + "operator {} handle FlushEvent begin, wait for sink writers flush success", + indexOfThisSubtask); + sessionCache.clear(); + Future<CoordinationResponse> waitForSuccess = + submitRequestToOperator(new WaitForFlushSuccessRequest(indexOfThisSubtask)); + output.collect(element); + // wait for sink writers flush success + waitForSuccess.get(); + LOG.info( + "operator {} handle FlushEvent end, all sink writers flush success", + indexOfThisSubtask); + } else if (element.getValue() instanceof CreateTableEvent) { + TableId tableId = ((CreateTableEvent) element.getValue()).tableId(); + Schema schema = ((CreateTableEvent) element.getValue()).getSchema(); + schemaMaps.put(tableId, schema); + fieldGetterMaps.put(tableId, TypeConvertUtils.createFieldGetters(schema)); + output.collect(element); + } else if (element.getValue() instanceof SchemaChangeEvent) { + SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) element.getValue(); + TableId tableId = schemaChangeEvent.tableId(); + Schema newSchema = + SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); + schemaMaps.put(tableId, newSchema); + fieldGetterMaps.put(tableId, TypeConvertUtils.createFieldGetters(newSchema)); + output.collect(element); + } else { + output.collect(element); + LOG.warn("unknown element {}", element.getValue()); + } + } + + private void emitLatestSchema(TableId tableId) throws Exception { + Optional<Schema> schema = schemaEvolutionClient.getLatestSchema(tableId); + if (schema.isPresent()) { + Schema latestSchema = schema.get(); + schemaMaps.put(tableId, latestSchema); + fieldGetterMaps.put(tableId, TypeConvertUtils.createFieldGetters(latestSchema)); + output.collect(new StreamRecord<>(new CreateTableEvent(tableId, latestSchema))); + } else { + throw new RuntimeException( + "Could not find schema message from SchemaRegistry for " + tableId); + } + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + super.prepareSnapshotPreBarrier(checkpointId); + if (endOfInput) { + return; + } + LOG.info( + "operator {} prepare snapshot, wait for sink writers flush success", + indexOfThisSubtask); + // wait for sink writers flush success + waitLastSnapshotFlushSuccess(); + snapshotFlushSuccess = + submitRequestToOperator( + new WaitForFlushSuccessRequest( + getRuntimeContext().getIndexOfThisSubtask())); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + sessionCache.clear(); Review Comment: So, do we need to request a new session ID after each checkpoint, which may have a performance impact? It is expected that the checkpoint interval will need to be larger, right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
