This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 396398770 [INLONG-8029][Sort] Use fixed subscription name when start a 
pulsar reader (#8033)
396398770 is described below

commit 396398770cbbdfea441cc4b87f083601237e729c
Author: Schnapps <[email protected]>
AuthorDate: Tue May 16 15:36:13 2023 +0800

    [INLONG-8029][Sort] Use fixed subscription name when start a pulsar reader 
(#8033)
---
 CHANGES.md                                                  |  2 +-
 .../inlong/sort/pulsar/internal/FlinkPulsarSource.java      |  7 +++++--
 .../sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java |  1 +
 .../apache/inlong/sort/pulsar/internal/PulsarFetcher.java   | 10 ++++++++--
 .../apache/inlong/sort/pulsar/internal/ReaderThread.java    | 13 +++++++++----
 5 files changed, 24 insertions(+), 9 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 101f7f406..496c5c568 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -39,7 +39,6 @@
 | [INLONG-7769](https://github.com/apache/inlong/issues/7769) | 
[Bug][DataProxy] NPE when request Inlong Manager failed                         
                         |
 | [INLONG-7512](https://github.com/apache/inlong/issues/7512) | 
[Improve][DataProxy] Update the metrics log level to avoid the log file 
increasing quickly               |
 | [INLONG-7766](https://github.com/apache/inlong/issues/7766) | 
[Bug][DataProxySDK] Adjusted frame length exceeds occurred when reporting data 
through the HTTP protocol |
-| [INLONG-7194](https://github.com/apache/inlong/issues/7194) | 
[Improve][DataProxy] Migrate index log statistics for the new mq layer          
                         |
 
 ### TubeMQ
 |                            ISSUE                            | Summary        
                                                      |
@@ -155,6 +154,7 @@
 | [INLONG-7970](https://github.com/apache/inlong/issues/7970) | [Bug][Sort] 
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.flink.table.data.StringData                                |
 | [INLONG-7747](https://github.com/apache/inlong/issues/7747) | 
[Umbrella][Sort] Improve memory stability of data ingesting into iceberg        
                                                                   |
 | [INLONG-6545](https://github.com/apache/inlong/issues/6545) | 
[Improve][Sort] Accurately parse the schema type and completely match the 
missing precision information                                            |
+| [INLONG-8029](https://github.com/apache/inlong/issues/8029) | 
[Improve][Sort] Use fixed subscription name when start a pulsar reader          
                                                                   |
 
 ### Audit
 |                            ISSUE                            | Summary        
                                                                           |
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
index 9225abd43..12f5c17b5 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
@@ -620,7 +620,8 @@ public class FlinkPulsarSource<T>
                 getRuntimeContext().getUserCodeClassLoader(),
                 streamingRuntime,
                 useMetrics,
-                excludeStartMessageIds);
+                excludeStartMessageIds,
+                getSubscriptionName());
 
         if (!running) {
             return;
@@ -642,7 +643,8 @@ public class FlinkPulsarSource<T>
             ClassLoader userCodeClassLoader,
             StreamingRuntimeContext streamingRuntime,
             boolean useMetrics,
-            Set<TopicRange> excludeStartMessageIds) throws Exception {
+            Set<TopicRange> excludeStartMessageIds,
+            String subscriptionName) throws Exception {
 
         // readerConf.putIfAbsent(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, 
getSubscriptionName());
 
@@ -657,6 +659,7 @@ public class FlinkPulsarSource<T>
                 streamingRuntime,
                 clientConfigurationData,
                 readerConf,
+                subscriptionName,
                 pollTimeoutMs,
                 commitMaxRetries,
                 deserializer,
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
index 62d1c7f10..df713b7c6 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
@@ -608,6 +608,7 @@ public class FlinkPulsarSourceWithoutAdmin<T>
                 streamingRuntime,
                 clientConfigurationData,
                 readerConf,
+                getSubscriptionName(),
                 pollTimeoutMs,
                 commitMaxRetries,
                 deserializer,
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
index e5f7632e6..492fc3bf6 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
@@ -117,6 +117,8 @@ public class PulsarFetcher<T> {
 
     protected final Map<String, Object> readerConf;
 
+    protected final String subscriptionName;
+
     protected final PulsarDeserializationSchema<T> deserializer;
 
     protected final int pollTimeoutMs;
@@ -181,7 +183,8 @@ public class PulsarFetcher<T> {
             PulsarDeserializationSchema<T> deserializer,
             PulsarMetadataReader metadataReader,
             MetricGroup consumerMetricGroup,
-            boolean useMetrics) throws Exception {
+            boolean useMetrics,
+            String subscriptionName) throws Exception {
         this(
                 sourceContext,
                 seedTopicsWithInitialOffsets,
@@ -193,6 +196,7 @@ public class PulsarFetcher<T> {
                 runtimeContext,
                 clientConf,
                 readerConf,
+                subscriptionName,
                 pollTimeoutMs,
                 3, // commit retries before fail
                 deserializer,
@@ -212,6 +216,7 @@ public class PulsarFetcher<T> {
             StreamingRuntimeContext runtimeContext,
             ClientConfigurationData clientConf,
             Map<String, Object> readerConf,
+            String subscriptionName,
             int pollTimeoutMs,
             int commitMaxRetries,
             PulsarDeserializationSchema<T> deserializer,
@@ -221,6 +226,7 @@ public class PulsarFetcher<T> {
 
         this.sourceContext = sourceContext;
         this.watermarkOutput = new 
SourceContextWatermarkOutputAdapter<>(sourceContext);
+        this.subscriptionName = subscriptionName;
         this.watermarkOutputMultiplexer = new 
WatermarkOutputMultiplexer(watermarkOutput);
         this.useMetrics = useMetrics;
         this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
@@ -525,7 +531,7 @@ public class PulsarFetcher<T> {
                 exceptionProxy,
                 failOnDataLoss,
                 useEarliestWhenDataLoss,
-                excludeStartMessageIds.contains(state.getTopicRange()));
+                excludeStartMessageIds.contains(state.getTopicRange()), 
subscriptionName);
     }
 
     /**
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
index 295f4df10..ac4d0714f 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
@@ -63,6 +63,7 @@ public class ReaderThread<T> extends Thread {
     private boolean failOnDataLoss = true;
     private boolean useEarliestWhenDataLoss = false;
     private final PulsarCollector pulsarCollector;
+    private final String subscriptionName;
 
     protected volatile boolean running = true;
     protected volatile boolean closed = false;
@@ -78,7 +79,8 @@ public class ReaderThread<T> extends Thread {
             Map<String, Object> readerConf,
             PulsarDeserializationSchema<T> deserializer,
             int pollTimeoutMs,
-            ExceptionProxy exceptionProxy) {
+            ExceptionProxy exceptionProxy,
+            String subscriptionName) {
         this.owner = owner;
         this.state = state;
         this.clientConf = clientConf;
@@ -86,9 +88,9 @@ public class ReaderThread<T> extends Thread {
         this.deserializer = deserializer;
         this.pollTimeoutMs = pollTimeoutMs;
         this.exceptionProxy = exceptionProxy;
-
         this.topicRange = state.getTopicRange();
         this.startMessageId = state.getOffset();
+        this.subscriptionName = subscriptionName;
         this.pulsarCollector = new PulsarCollector();
     }
 
@@ -102,8 +104,10 @@ public class ReaderThread<T> extends Thread {
             ExceptionProxy exceptionProxy,
             boolean failOnDataLoss,
             boolean useEarliestWhenDataLoss,
-            boolean excludeMessageId) {
-        this(owner, state, clientConf, readerConf, deserializer, 
pollTimeoutMs, exceptionProxy);
+            boolean excludeMessageId,
+            String subscriptionName) {
+        this(owner, state, clientConf, readerConf, deserializer, 
pollTimeoutMs, exceptionProxy,
+                subscriptionName);
         this.failOnDataLoss = failOnDataLoss;
         this.useEarliestWhenDataLoss = useEarliestWhenDataLoss;
         this.excludeMessageId = excludeMessageId;
@@ -141,6 +145,7 @@ public class ReaderThread<T> extends Thread {
                 .newReader(deserializer.getSchema())
                 .topic(topicRange.getTopic())
                 .startMessageId(startMessageId)
+                .subscriptionName(subscriptionName)
                 .loadConf(readerConf);
         log.info("Create a reader at topic {} starting from message {} 
(inclusive) : config = {}",
                 topicRange, startMessageId, readerConf);

Reply via email to