This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bcace693b1 [INLONG-11537][Sort] Optimize the session key generation of
TubeMQ Source (#11538)
bcace693b1 is described below
commit bcace693b1e53aac32b4f5d69539fa064905069e
Author: vernedeng <[email protected]>
AuthorDate: Tue Nov 26 10:21:54 2024 +0800
[INLONG-11537][Sort] Optimize the session key generation of TubeMQ Source
(#11538)
---
.../java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index abd69f8ecb..4cbd285f17 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -69,6 +69,8 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
private static final Logger LOG =
LoggerFactory.getLogger(FlinkTubeMQConsumer.class);
private static final String TUBE_OFFSET_STATE = "tube-offset-state";
+ private static final String UNDERSCORE = "_";
+
/**
* The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
*/
@@ -221,7 +223,10 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
messagePullConsumer =
messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer.subscribe(topic, streamIdSet);
String jobId = getRuntimeContext().getJobId().toString();
- messagePullConsumer.completeSubscribe(sessionKey.concat(jobId),
numTasks, true, currentOffsets);
+ String attemptNumber =
String.valueOf(getRuntimeContext().getAttemptNumber());
+ String startSessionKey =
sessionKey.concat(UNDERSCORE).concat(jobId).concat(UNDERSCORE).concat(attemptNumber);
+ LOG.info("start to init tube mq consumer, session key={}",
startSessionKey);
+ messagePullConsumer.completeSubscribe(startSessionKey, numTasks, true,
currentOffsets);
running = true;
}