This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bcace693b1 [INLONG-11537][Sort] Optimize the session key generation of 
TubeMQ Source (#11538)
bcace693b1 is described below

commit bcace693b1e53aac32b4f5d69539fa064905069e
Author: vernedeng <[email protected]>
AuthorDate: Tue Nov 26 10:21:54 2024 +0800

    [INLONG-11537][Sort] Optimize the session key generation of TubeMQ Source 
(#11538)
---
 .../java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java    | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index abd69f8ecb..4cbd285f17 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -69,6 +69,8 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkTubeMQConsumer.class);
     private static final String TUBE_OFFSET_STATE = "tube-offset-state";
 
+    private static final String UNDERSCORE = "_";
+
     /**
      * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
      */
@@ -221,7 +223,10 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
         messagePullConsumer = 
messageSessionFactory.createPullConsumer(consumerConfig);
         messagePullConsumer.subscribe(topic, streamIdSet);
         String jobId = getRuntimeContext().getJobId().toString();
-        messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), 
numTasks, true, currentOffsets);
+        String attemptNumber = 
String.valueOf(getRuntimeContext().getAttemptNumber());
+        String startSessionKey = 
sessionKey.concat(UNDERSCORE).concat(jobId).concat(UNDERSCORE).concat(attemptNumber);
+        LOG.info("start to init tube mq consumer, session key={}", 
startSessionKey);
+        messagePullConsumer.completeSubscribe(startSessionKey, numTasks, true, 
currentOffsets);
 
         running = true;
     }

Reply via email to