This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f1679e341e [INLONG-10522][SDK] SortSDK support assgin subscription 
(#10523)
f1679e341e is described below

commit f1679e341e0830578f06d95a981b201e644753ce
Author: vernedeng <[email protected]>
AuthorDate: Thu Jun 27 16:00:54 2024 +0800

    [INLONG-10522][SDK] SortSDK support assgin subscription (#10523)
---
 .../org/apache/inlong/sdk/sort/api/SortClientConfig.java | 16 ++++++++++++++++
 .../sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java  |  2 +-
 .../sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java  |  2 +-
 .../sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java    |  2 +-
 .../sort/fetcher/pulsar/PulsarSingleTopicFetcher.java    |  2 +-
 .../sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java    |  2 +-
 .../sort/impl/pulsar/InLongPulsarFetcherImplTest.java    |  1 +
 7 files changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
index b9776265b4..a354b1aced 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
@@ -35,6 +35,7 @@ public class SortClientConfig implements Serializable {
     private static final long serialVersionUID = -7531960714809683830L;
 
     private final String sortTaskId;
+    private final String subscription;
     private final String sortClusterName;
     private InLongTopicChangeListener assignmentsListener;
     private ReadCallback callback;
@@ -83,11 +84,22 @@ public class SortClientConfig implements Serializable {
             InLongTopicChangeListener assignmentsListener,
             ConsumeStrategy consumeStrategy,
             String localIp) {
+        this(sortTaskId, sortClusterName, assignmentsListener, 
consumeStrategy, localIp, sortTaskId);
+    }
+
+    public SortClientConfig(
+            String sortTaskId,
+            String sortClusterName,
+            InLongTopicChangeListener assignmentsListener,
+            ConsumeStrategy consumeStrategy,
+            String localIp,
+            String subscription) {
         this.sortTaskId = sortTaskId;
         this.sortClusterName = sortClusterName;
         this.assignmentsListener = assignmentsListener;
         this.consumeStrategy = consumeStrategy;
         this.localIp = localIp;
+        this.subscription = subscription;
     }
 
     public boolean isStopConsume() {
@@ -102,6 +114,10 @@ public class SortClientConfig implements Serializable {
         return sortTaskId;
     }
 
+    public String getSubscription() {
+        return subscription;
+    }
+
     public String getSortClusterName() {
         return sortClusterName;
     }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
index b5cb6b121d..a24c270389 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
@@ -106,7 +106,7 @@ public class KafkaMultiTopicsFetcher extends 
MultiTopicsFetcher {
     private KafkaConsumer<byte[], byte[]> createKafkaConsumer() {
         Properties properties = new Properties();
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-        properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
context.getConfig().getSortTaskId());
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
context.getConfig().getSubscription());
         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                 ByteArrayDeserializer.class.getName());
         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
index e88d0a9300..1c081a64ef 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
@@ -160,7 +160,7 @@ public class KafkaSingleTopicFetcher extends 
SingleTopicFetcher {
     private void createKafkaConsumer(String bootstrapServers) {
         Properties properties = new Properties();
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-        properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
context.getConfig().getSortTaskId());
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
context.getConfig().getSubscription());
         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                 ByteArrayDeserializer.class.getName());
         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
index bbe53d44c0..a346591e18 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
@@ -165,7 +165,7 @@ public class PulsarMultiTopicsFetcher extends 
MultiTopicsFetcher {
                     .collect(Collectors.toList());
             Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                     .topics(topicNames)
-                    .subscriptionName(context.getConfig().getSortTaskId())
+                    .subscriptionName(context.getConfig().getSubscription())
                     .subscriptionType(SubscriptionType.Shared)
                     .startMessageIdInclusive()
                     .subscriptionInitialPosition(position)
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
index 90d981c912..91a4dcf5f1 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
@@ -151,7 +151,7 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 
             consumer = client.newConsumer(Schema.BYTES)
                     .topic(topic.getTopic())
-                    .subscriptionName(context.getConfig().getSortTaskId())
+                    .subscriptionName(context.getConfig().getSubscription())
                     .subscriptionType(SubscriptionType.Shared)
                     .startMessageIdInclusive()
                     .subscriptionInitialPosition(position)
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
index d5792d849b..ae30d1c3d2 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
@@ -71,7 +71,7 @@ public class TubeSingleTopicFetcher extends 
SingleTopicFetcher {
         TubeClientConfig tubeClientConfig = 
tubeConsumerCreator.getTubeClientConfig();
         try {
             ConsumerConfig consumerConfig = new 
ConsumerConfig(tubeClientConfig.getMasterInfo(),
-                    context.getConfig().getSortTaskId());
+                    context.getConfig().getSubscription());
 
             messageConsumer = 
tubeConsumerCreator.getMessageSessionFactory().createPullConsumer(consumerConfig);
             if (messageConsumer != null) {
diff --git 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
index 81d7a188d7..b2d212fdcd 100644
--- 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
+++ 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
@@ -81,6 +81,7 @@ public class InLongPulsarFetcherImplTest {
 
         when(clientContext.getConfig()).thenReturn(sortClientConfig);
         when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
+        when(sortClientConfig.getSubscription()).thenReturn("sortTaskId");
 
     }
 

Reply via email to