This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e71daca928 [INLONG-11909][Feature][Sort] Optimize the generation of 
tube session key (#11910)
e71daca928 is described below

commit e71daca928f04ab015e82d4b1c99fecad25d1f3d
Author: vernedeng <[email protected]>
AuthorDate: Tue Jul 1 15:02:28 2025 +0800

    [INLONG-11909][Feature][Sort] Optimize the generation of tube session key 
(#11910)
---
 .../java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java   | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index ab46b972aa..cc0b6a7357 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -229,9 +229,13 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
         messagePullConsumer = 
messageSessionFactory.createPullConsumer(consumerConfig);
         messagePullConsumer.subscribe(topic, streamIdSet);
         String jobId = getRuntimeContext().getJobId().toString();
-        String realSessionKey = sessionKey + jobId + restoredCheckpointId;
+        int attempt = getRuntimeContext().getAttemptNumber();
+        String realSessionKey = sessionKey + "_" + jobId + "_" + 
restoredCheckpointId + "_" + attempt;
+        LOG.info("try to subscribe topic {} with sessionKey: {}, 
currentOffsets: {}.",
+                topic, realSessionKey, currentOffsets);
         messagePullConsumer.completeSubscribe(realSessionKey, numTasks, true, 
currentOffsets);
-
+        LOG.info("Subscribe topic {} success, sessionKey: {}, currentOffsets: 
{}.",
+                topic, realSessionKey, currentOffsets);
         running = true;
     }
 

Reply via email to