This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3db440b0c6 [INLONG-11805][Sort] Restored Checkpoint Id as part of Tube
Connector Session Key (#11806)
3db440b0c6 is described below
commit 3db440b0c64eca4ac9ac1c929cad70a23b235ca3
Author: vernedeng <[email protected]>
AuthorDate: Mon Mar 17 16:15:09 2025 +0800
[INLONG-11805][Sort] Restored Checkpoint Id as part of Tube Connector
Session Key (#11806)
---
.../org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index 47f17eb95d..ab46b972aa 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -144,6 +144,10 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
* The TubeMQ pull consumer.
*/
private transient PullMessageConsumer messagePullConsumer;
+ /**
+ * The restore checkpoint id.
+ */
+ private transient Long restoredCheckpointId;
/**
* Build a TubeMQ source function
@@ -190,11 +194,12 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
+ restoredCheckpointId = context.getRestoredCheckpointId().orElse(-1L);
+
TypeInformation<Tuple2<String, Long>> typeInformation =
new TupleTypeInfo<>(STRING_TYPE_INFO, LONG_TYPE_INFO);
ListStateDescriptor<Tuple2<String, Long>> stateDescriptor =
new ListStateDescriptor<>(TUBE_OFFSET_STATE, typeInformation);
-
OperatorStateStore stateStore = context.getOperatorStateStore();
offsetsState = stateStore.getListState(stateDescriptor);
currentOffsets = new HashMap<>();
@@ -224,7 +229,8 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
messagePullConsumer =
messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer.subscribe(topic, streamIdSet);
String jobId = getRuntimeContext().getJobId().toString();
- messagePullConsumer.completeSubscribe(sessionKey.concat(jobId),
numTasks, true, currentOffsets);
+ String realSessionKey = sessionKey + jobId + restoredCheckpointId;
+ messagePullConsumer.completeSubscribe(realSessionKey, numTasks, true,
currentOffsets);
running = true;
}