This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 396398770 [INLONG-8029][Sort] Use fixed subscription name when start a
pulsar reader (#8033)
396398770 is described below
commit 396398770cbbdfea441cc4b87f083601237e729c
Author: Schnapps <[email protected]>
AuthorDate: Tue May 16 15:36:13 2023 +0800
[INLONG-8029][Sort] Use fixed subscription name when start a pulsar reader
(#8033)
---
CHANGES.md | 2 +-
.../inlong/sort/pulsar/internal/FlinkPulsarSource.java | 7 +++++--
.../sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java | 1 +
.../apache/inlong/sort/pulsar/internal/PulsarFetcher.java | 10 ++++++++--
.../apache/inlong/sort/pulsar/internal/ReaderThread.java | 13 +++++++++----
5 files changed, 24 insertions(+), 9 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 101f7f406..496c5c568 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -39,7 +39,6 @@
| [INLONG-7769](https://github.com/apache/inlong/issues/7769) |
[Bug][DataProxy] NPE when request Inlong Manager failed
|
| [INLONG-7512](https://github.com/apache/inlong/issues/7512) |
[Improve][DataProxy] Update the metrics log level to avoid the log file
increasing quickly |
| [INLONG-7766](https://github.com/apache/inlong/issues/7766) |
[Bug][DataProxySDK] Adjusted frame length exceeds occurred when reporting data
through the HTTP protocol |
-| [INLONG-7194](https://github.com/apache/inlong/issues/7194) |
[Improve][DataProxy] Migrate index log statistics for the new mq layer
|
### TubeMQ
| ISSUE | Summary
|
@@ -155,6 +154,7 @@
| [INLONG-7970](https://github.com/apache/inlong/issues/7970) | [Bug][Sort]
java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.flink.table.data.StringData |
| [INLONG-7747](https://github.com/apache/inlong/issues/7747) |
[Umbrella][Sort] Improve memory stability of data ingesting into iceberg
|
| [INLONG-6545](https://github.com/apache/inlong/issues/6545) |
[Improve][Sort] Accurately parse the schema type and completely match the
missing precision information |
+| [INLONG-8029](https://github.com/apache/inlong/issues/8029) |
[Improve][Sort] Use fixed subscription name when start a pulsar reader
|
### Audit
| ISSUE | Summary
|
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
index 9225abd43..12f5c17b5 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
@@ -620,7 +620,8 @@ public class FlinkPulsarSource<T>
getRuntimeContext().getUserCodeClassLoader(),
streamingRuntime,
useMetrics,
- excludeStartMessageIds);
+ excludeStartMessageIds,
+ getSubscriptionName());
if (!running) {
return;
@@ -642,7 +643,8 @@ public class FlinkPulsarSource<T>
ClassLoader userCodeClassLoader,
StreamingRuntimeContext streamingRuntime,
boolean useMetrics,
- Set<TopicRange> excludeStartMessageIds) throws Exception {
+ Set<TopicRange> excludeStartMessageIds,
+ String subscriptionName) throws Exception {
// readerConf.putIfAbsent(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY,
getSubscriptionName());
@@ -657,6 +659,7 @@ public class FlinkPulsarSource<T>
streamingRuntime,
clientConfigurationData,
readerConf,
+ subscriptionName,
pollTimeoutMs,
commitMaxRetries,
deserializer,
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
index 62d1c7f10..df713b7c6 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
@@ -608,6 +608,7 @@ public class FlinkPulsarSourceWithoutAdmin<T>
streamingRuntime,
clientConfigurationData,
readerConf,
+ getSubscriptionName(),
pollTimeoutMs,
commitMaxRetries,
deserializer,
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
index e5f7632e6..492fc3bf6 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
@@ -117,6 +117,8 @@ public class PulsarFetcher<T> {
protected final Map<String, Object> readerConf;
+ protected final String subscriptionName;
+
protected final PulsarDeserializationSchema<T> deserializer;
protected final int pollTimeoutMs;
@@ -181,7 +183,8 @@ public class PulsarFetcher<T> {
PulsarDeserializationSchema<T> deserializer,
PulsarMetadataReader metadataReader,
MetricGroup consumerMetricGroup,
- boolean useMetrics) throws Exception {
+ boolean useMetrics,
+ String subscriptionName) throws Exception {
this(
sourceContext,
seedTopicsWithInitialOffsets,
@@ -193,6 +196,7 @@ public class PulsarFetcher<T> {
runtimeContext,
clientConf,
readerConf,
+ subscriptionName,
pollTimeoutMs,
3, // commit retries before fail
deserializer,
@@ -212,6 +216,7 @@ public class PulsarFetcher<T> {
StreamingRuntimeContext runtimeContext,
ClientConfigurationData clientConf,
Map<String, Object> readerConf,
+ String subscriptionName,
int pollTimeoutMs,
int commitMaxRetries,
PulsarDeserializationSchema<T> deserializer,
@@ -221,6 +226,7 @@ public class PulsarFetcher<T> {
this.sourceContext = sourceContext;
this.watermarkOutput = new
SourceContextWatermarkOutputAdapter<>(sourceContext);
+ this.subscriptionName = subscriptionName;
this.watermarkOutputMultiplexer = new
WatermarkOutputMultiplexer(watermarkOutput);
this.useMetrics = useMetrics;
this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
@@ -525,7 +531,7 @@ public class PulsarFetcher<T> {
exceptionProxy,
failOnDataLoss,
useEarliestWhenDataLoss,
- excludeStartMessageIds.contains(state.getTopicRange()));
+ excludeStartMessageIds.contains(state.getTopicRange()),
subscriptionName);
}
/**
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
index 295f4df10..ac4d0714f 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
@@ -63,6 +63,7 @@ public class ReaderThread<T> extends Thread {
private boolean failOnDataLoss = true;
private boolean useEarliestWhenDataLoss = false;
private final PulsarCollector pulsarCollector;
+ private final String subscriptionName;
protected volatile boolean running = true;
protected volatile boolean closed = false;
@@ -78,7 +79,8 @@ public class ReaderThread<T> extends Thread {
Map<String, Object> readerConf,
PulsarDeserializationSchema<T> deserializer,
int pollTimeoutMs,
- ExceptionProxy exceptionProxy) {
+ ExceptionProxy exceptionProxy,
+ String subscriptionName) {
this.owner = owner;
this.state = state;
this.clientConf = clientConf;
@@ -86,9 +88,9 @@ public class ReaderThread<T> extends Thread {
this.deserializer = deserializer;
this.pollTimeoutMs = pollTimeoutMs;
this.exceptionProxy = exceptionProxy;
-
this.topicRange = state.getTopicRange();
this.startMessageId = state.getOffset();
+ this.subscriptionName = subscriptionName;
this.pulsarCollector = new PulsarCollector();
}
@@ -102,8 +104,10 @@ public class ReaderThread<T> extends Thread {
ExceptionProxy exceptionProxy,
boolean failOnDataLoss,
boolean useEarliestWhenDataLoss,
- boolean excludeMessageId) {
- this(owner, state, clientConf, readerConf, deserializer,
pollTimeoutMs, exceptionProxy);
+ boolean excludeMessageId,
+ String subscriptionName) {
+ this(owner, state, clientConf, readerConf, deserializer,
pollTimeoutMs, exceptionProxy,
+ subscriptionName);
this.failOnDataLoss = failOnDataLoss;
this.useEarliestWhenDataLoss = useEarliestWhenDataLoss;
this.excludeMessageId = excludeMessageId;
@@ -141,6 +145,7 @@ public class ReaderThread<T> extends Thread {
.newReader(deserializer.getSchema())
.topic(topicRange.getTopic())
.startMessageId(startMessageId)
+ .subscriptionName(subscriptionName)
.loadConf(readerConf);
log.info("Create a reader at topic {} starting from message {}
(inclusive) : config = {}",
topicRange, startMessageId, readerConf);