lvyanquan commented on code in PR #3254:
URL: https://github.com/apache/flink-cdc/pull/3254#discussion_r1650032276


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SessionCommitCoordinator.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CommitSessionResponse;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The SessionCommitCoordinator class is responsible for coordinating and 
controlling the order of
+ * session submissions by multiple concurrent executors in a distributed 
processing environment. It
+ * ensures that: 1. Each executor must submit sessions in ascending order by 
session ID. 2. Each
+ * executor must submit a Constant.END_OF_SESSION as a terminator after 
completing its session
+ * submissions.
+ *
+ * <p>Working Principle: - Maintains an array of queues (toCommitSessionIds), 
with each queue
+ * corresponding to an executor, to isolate the session submissions of 
different executors. This
+ * maintains the independence and submission order of each executor. - 
Executors submit session IDs
+ * sequentially by invoking the commit() method. The commit operation simply 
enqueues the session ID
+ * into the corresponding executor's queue. - The getToCommitSessionId() 
method is tasked with
+ * selecting the smallest session ID across all executors that has been 
"submitted" or is "no longer
+ * required" for submission, allowing for further processing. "Submitted" 
means that the session ID
+ * has been submitted by all executors; "no longer required" assumes that any 
subsequent session IDs
+ * that are yet to be submitted will always be greater than the currently 
chosen ID. - Once a
+ * session ID is selected by the getToCommitSessionId() method, it's removed 
from all executors'
+ * queues, indicating that the session ID has been processed. This process 
ensures ordered
+ * processing of the sessions and allows the system to efficiently progress. - 
Each processing step
+ * of the session IDs is based on a key assumption: that any subsequent 
session ID submissions will
+ * always be greater than the current processed session ID. This is guaranteed 
by the fact that each
+ * executor commits to submitting session IDs in order and submits a special 
terminator
+ * (Constant.END_OF_SESSION) at the end.
+ *
+ * <p>Note: - The class presupposes that all session IDs are comparable, and 
each executor strictly
+ * adheres to the submission order of session IDs in ascending order. Any 
behavior that deviates
+ * from this principle may lead to unpredictable outcomes, as it contravenes 
the fundamental
+ * assumption of the class's design. - The introduction of 
Constant.END_OF_SESSION as a terminator
+ * is a key aspect of this coordination strategy, as it provides a clear 
signal for recognizing the
+ * completion status of an executor, allowing the system to determine whether 
all relevant sessions
+ * have been processed.
+ */
+public class SessionCommitCoordinator {

Review Comment:
   Is it better for us to change it to `Manager` or `Helper` to distinguish it 
from Flink's `OperatorCoordinator`
   



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionRequest;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionResponse;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.WaitForFlushSuccessRequest;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
+import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.SerializedValue;
+
+import com.aliyun.odps.PartitionSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Processes a {@link DataChangeEvent}, extracting data and encapsulating it 
into a {@link
+ * SessionIdentifier}, and then sends a {@link CreateSessionRequest} to the 
{@link
+ * SessionManageCoordinator} to create a writing session. Subsequently, it 
incorporates the
+ * SessionId into the metadata of the {@link DataChangeEvent} for downstream 
processing.
+ */
+public class SessionManageOperator extends AbstractStreamOperator<Event>
+        implements OneInputStreamOperator<Event, Event>, OperatorEventHandler, 
BoundedOneInput {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(SessionManageOperator.class);
+
+    /** a tricky way to get an Operator from sink. */
+    public static SessionManageOperator instance;
+
+    private final MaxComputeOptions options;
+    private final OperatorID schemaOperatorUid;
+
+    private transient TaskOperatorEventGateway taskOperatorEventGateway;
+    private transient Map<SessionIdentifier, String> sessionCache;
+    private transient Map<TableId, Schema> schemaMaps;
+    private transient Map<TableId, List<RecordData.FieldGetter>> 
fieldGetterMaps;
+    private transient SchemaEvolutionClient schemaEvolutionClient;
+
+    private transient Future<CoordinationResponse> snapshotFlushSuccess;
+    private transient int indexOfThisSubtask;
+    /**
+     * trigger endOfInput is ahead of prepareSnapshotPreBarrier, so we need 
this flag to handle when
+     * endOfInput, send WaitForSuccessRequest in advance.
+     */
+    private transient boolean endOfInput;
+
+    public SessionManageOperator(MaxComputeOptions options, OperatorID 
schemaOperatorUid) {
+        this.chainingStrategy = ChainingStrategy.ALWAYS;
+        this.options = options;
+        this.schemaOperatorUid = schemaOperatorUid;
+    }
+
+    @Override
+    public void open() throws Exception {
+        this.sessionCache = new HashMap<>();
+        this.schemaMaps = new HashMap<>();
+        this.fieldGetterMaps = new HashMap<>();
+        SessionManageOperator.instance = this;
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<Event>> output) {
+        super.setup(containingTask, config, output);
+        schemaEvolutionClient =
+                new SchemaEvolutionClient(
+                        
containingTask.getEnvironment().getOperatorCoordinatorEventGateway(),
+                        schemaOperatorUid);
+        indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    }
+
+    @Override
+    public void processElement(StreamRecord<Event> element) throws Exception {
+        if (element.getValue() instanceof DataChangeEvent) {
+            DataChangeEvent dataChangeEvent = (DataChangeEvent) 
element.getValue();
+            TableId tableId = dataChangeEvent.tableId();
+            // because of this operator is between SchemaOperator and 
DataSinkWriterOperator, no
+            // schema will fill when CreateTableEvent is loss.
+            if (!schemaMaps.containsKey(tableId)) {
+                emitLatestSchema(tableId);
+            }
+            String partitionName =
+                    extractPartition(
+                            dataChangeEvent.op() == OperationType.DELETE
+                                    ? dataChangeEvent.before()
+                                    : dataChangeEvent.after(),
+                            tableId);
+            SessionIdentifier sessionIdentifier =
+                    SessionIdentifier.of(
+                            options.getProject(),
+                            MaxComputeUtils.getSchema(options, tableId),
+                            tableId.getTableName(),
+                            partitionName);
+            if (!sessionCache.containsKey(sessionIdentifier)) {
+                CreateSessionResponse response =
+                        (CreateSessionResponse)
+                                sendRequestToOperator(new 
CreateSessionRequest(sessionIdentifier));
+                sessionCache.put(sessionIdentifier, response.getSessionId());
+            }
+            dataChangeEvent
+                    .meta()
+                    .put(Constant.TUNNEL_SESSION_ID, 
sessionCache.get(sessionIdentifier));
+            dataChangeEvent.meta().put(Constant.MAXCOMPUTE_PARTITION_NAME, 
partitionName);
+            output.collect(new StreamRecord<>(dataChangeEvent));
+        } else if (element.getValue() instanceof FlushEvent) {
+            LOG.info(
+                    "operator {} handle FlushEvent begin, wait for sink 
writers flush success",
+                    indexOfThisSubtask);
+            sessionCache.clear();
+            Future<CoordinationResponse> waitForSuccess =
+                    submitRequestToOperator(new 
WaitForFlushSuccessRequest(indexOfThisSubtask));
+            output.collect(element);
+            // wait for sink writers flush success
+            waitForSuccess.get();
+            LOG.info(
+                    "operator {} handle FlushEvent end, all sink writers flush 
success",
+                    indexOfThisSubtask);
+        } else if (element.getValue() instanceof CreateTableEvent) {
+            TableId tableId = ((CreateTableEvent) 
element.getValue()).tableId();
+            Schema schema = ((CreateTableEvent) 
element.getValue()).getSchema();
+            schemaMaps.put(tableId, schema);
+            fieldGetterMaps.put(tableId, 
TypeConvertUtils.createFieldGetters(schema));
+            output.collect(element);
+        } else if (element.getValue() instanceof SchemaChangeEvent) {
+            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) 
element.getValue();
+            TableId tableId = schemaChangeEvent.tableId();
+            Schema newSchema =
+                    
SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent);
+            schemaMaps.put(tableId, newSchema);
+            fieldGetterMaps.put(tableId, 
TypeConvertUtils.createFieldGetters(newSchema));
+            output.collect(element);
+        } else {
+            output.collect(element);
+            LOG.warn("unknown element {}", element.getValue());
+        }
+    }
+
+    private void emitLatestSchema(TableId tableId) throws Exception {
+        Optional<Schema> schema = 
schemaEvolutionClient.getLatestSchema(tableId);
+        if (schema.isPresent()) {
+            Schema latestSchema = schema.get();
+            schemaMaps.put(tableId, latestSchema);
+            fieldGetterMaps.put(tableId, 
TypeConvertUtils.createFieldGetters(latestSchema));
+            output.collect(new StreamRecord<>(new CreateTableEvent(tableId, 
latestSchema)));
+        } else {
+            throw new RuntimeException(
+                    "Could not find schema message from SchemaRegistry for " + 
tableId);
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        super.prepareSnapshotPreBarrier(checkpointId);
+        if (endOfInput) {
+            return;
+        }
+        LOG.info(
+                "operator {} prepare snapshot, wait for sink writers flush 
success",
+                indexOfThisSubtask);
+        // wait for sink writers flush success
+        waitLastSnapshotFlushSuccess();
+        snapshotFlushSuccess =
+                submitRequestToOperator(
+                        new WaitForFlushSuccessRequest(
+                                getRuntimeContext().getIndexOfThisSubtask()));
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        sessionCache.clear();

Review Comment:
   So, do we need to request a new session ID after each checkpoint, which may 
have a performance impact? It is expected that the checkpoint interval will 
need to be larger, right.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to