This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e71daca928 [INLONG-11909][Feature][Sort] Optimize the generation of
tube session key (#11910)
e71daca928 is described below
commit e71daca928f04ab015e82d4b1c99fecad25d1f3d
Author: vernedeng <[email protected]>
AuthorDate: Tue Jul 1 15:02:28 2025 +0800
[INLONG-11909][Feature][Sort] Optimize the generation of tube session key
(#11910)
---
.../java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index ab46b972aa..cc0b6a7357 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -229,9 +229,13 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
messagePullConsumer =
messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer.subscribe(topic, streamIdSet);
String jobId = getRuntimeContext().getJobId().toString();
- String realSessionKey = sessionKey + jobId + restoredCheckpointId;
+ int attempt = getRuntimeContext().getAttemptNumber();
+ String realSessionKey = sessionKey + "_" + jobId + "_" +
restoredCheckpointId + "_" + attempt;
+ LOG.info("try to subscribe topic {} with sessionKey: {},
currentOffsets: {}.",
+ topic, realSessionKey, currentOffsets);
messagePullConsumer.completeSubscribe(realSessionKey, numTasks, true,
currentOffsets);
-
+ LOG.info("Subscribe topic {} success, sessionKey: {}, currentOffsets:
{}.",
+ topic, realSessionKey, currentOffsets);
running = true;
}