This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f1679e341e [INLONG-10522][SDK] SortSDK support assgin subscription
(#10523)
f1679e341e is described below
commit f1679e341e0830578f06d95a981b201e644753ce
Author: vernedeng <[email protected]>
AuthorDate: Thu Jun 27 16:00:54 2024 +0800
[INLONG-10522][SDK] SortSDK support assgin subscription (#10523)
---
.../org/apache/inlong/sdk/sort/api/SortClientConfig.java | 16 ++++++++++++++++
.../sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java | 2 +-
.../sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java | 2 +-
.../sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java | 2 +-
.../sort/fetcher/pulsar/PulsarSingleTopicFetcher.java | 2 +-
.../sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java | 2 +-
.../sort/impl/pulsar/InLongPulsarFetcherImplTest.java | 1 +
7 files changed, 22 insertions(+), 5 deletions(-)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
index b9776265b4..a354b1aced 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
@@ -35,6 +35,7 @@ public class SortClientConfig implements Serializable {
private static final long serialVersionUID = -7531960714809683830L;
private final String sortTaskId;
+ private final String subscription;
private final String sortClusterName;
private InLongTopicChangeListener assignmentsListener;
private ReadCallback callback;
@@ -83,11 +84,22 @@ public class SortClientConfig implements Serializable {
InLongTopicChangeListener assignmentsListener,
ConsumeStrategy consumeStrategy,
String localIp) {
+ this(sortTaskId, sortClusterName, assignmentsListener,
consumeStrategy, localIp, sortTaskId);
+ }
+
+ public SortClientConfig(
+ String sortTaskId,
+ String sortClusterName,
+ InLongTopicChangeListener assignmentsListener,
+ ConsumeStrategy consumeStrategy,
+ String localIp,
+ String subscription) {
this.sortTaskId = sortTaskId;
this.sortClusterName = sortClusterName;
this.assignmentsListener = assignmentsListener;
this.consumeStrategy = consumeStrategy;
this.localIp = localIp;
+ this.subscription = subscription;
}
public boolean isStopConsume() {
@@ -102,6 +114,10 @@ public class SortClientConfig implements Serializable {
return sortTaskId;
}
+ public String getSubscription() {
+ return subscription;
+ }
+
public String getSortClusterName() {
return sortClusterName;
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
index b5cb6b121d..a24c270389 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
@@ -106,7 +106,7 @@ public class KafkaMultiTopicsFetcher extends
MultiTopicsFetcher {
private KafkaConsumer<byte[], byte[]> createKafkaConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,
context.getConfig().getSortTaskId());
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG,
context.getConfig().getSubscription());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
index e88d0a9300..1c081a64ef 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
@@ -160,7 +160,7 @@ public class KafkaSingleTopicFetcher extends
SingleTopicFetcher {
private void createKafkaConsumer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,
context.getConfig().getSortTaskId());
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG,
context.getConfig().getSubscription());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
index bbe53d44c0..a346591e18 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
@@ -165,7 +165,7 @@ public class PulsarMultiTopicsFetcher extends
MultiTopicsFetcher {
.collect(Collectors.toList());
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topics(topicNames)
- .subscriptionName(context.getConfig().getSortTaskId())
+ .subscriptionName(context.getConfig().getSubscription())
.subscriptionType(SubscriptionType.Shared)
.startMessageIdInclusive()
.subscriptionInitialPosition(position)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
index 90d981c912..91a4dcf5f1 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
@@ -151,7 +151,7 @@ public class PulsarSingleTopicFetcher extends
SingleTopicFetcher {
consumer = client.newConsumer(Schema.BYTES)
.topic(topic.getTopic())
- .subscriptionName(context.getConfig().getSortTaskId())
+ .subscriptionName(context.getConfig().getSubscription())
.subscriptionType(SubscriptionType.Shared)
.startMessageIdInclusive()
.subscriptionInitialPosition(position)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
index d5792d849b..ae30d1c3d2 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
@@ -71,7 +71,7 @@ public class TubeSingleTopicFetcher extends
SingleTopicFetcher {
TubeClientConfig tubeClientConfig =
tubeConsumerCreator.getTubeClientConfig();
try {
ConsumerConfig consumerConfig = new
ConsumerConfig(tubeClientConfig.getMasterInfo(),
- context.getConfig().getSortTaskId());
+ context.getConfig().getSubscription());
messageConsumer =
tubeConsumerCreator.getMessageSessionFactory().createPullConsumer(consumerConfig);
if (messageConsumer != null) {
diff --git
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
index 81d7a188d7..b2d212fdcd 100644
---
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
+++
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
@@ -81,6 +81,7 @@ public class InLongPulsarFetcherImplTest {
when(clientContext.getConfig()).thenReturn(sortClientConfig);
when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
+ when(sortClientConfig.getSubscription()).thenReturn("sortTaskId");
}