This is an automated email from the ASF dual-hosted git repository.
wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6f02e93069 [INLONG-11794][Manager] Pulsar source supports setting
scan.startup.mode to null (#11795)
6f02e93069 is described below
commit 6f02e93069b396a2669e8386b501f81d07acdd7f
Author: fuweng11 <[email protected]>
AuthorDate: Wed Mar 5 09:54:45 2025 +0800
[INLONG-11794][Manager] Pulsar source supports setting scan.startup.mode to
null (#11795)
---
.../manager/pojo/sort/node/provider/PulsarProvider.java | 6 ++++--
.../sort/protocol/node/extract/PulsarExtractNode.java | 13 ++++++++-----
2 files changed, 12 insertions(+), 7 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index 50254ddadc..b8a1e8b678 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -66,7 +66,9 @@ public class PulsarProvider implements ExtractNodeProvider {
pulsarSource.getDataEscapeChar(),
pulsarSource.getIgnoreParseError());
- PulsarScanStartupMode startupMode =
PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
+ String startupMode =
+ StringUtils.isNotBlank(pulsarSource.getScanStartupMode()) ?
PulsarScanStartupMode.forName(
+ pulsarSource.getScanStartupMode()).getValue() : null;
final String primaryKey = pulsarSource.getPrimaryKey();
final String serviceUrl = pulsarSource.getServiceUrl();
final String adminUrl = pulsarSource.getAdminUrl();
@@ -83,7 +85,7 @@ public class PulsarProvider implements ExtractNodeProvider {
adminUrl,
serviceUrl,
format,
- startupMode.getValue(),
+ startupMode,
primaryKey,
pulsarSource.getSubscription(),
scanStartupSubStartOffset,
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index 1887aca145..8ba110d66d 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -108,7 +108,7 @@ public class PulsarExtractNode extends ExtractNode
implements InlongMetric, Meta
@JsonProperty("adminUrl") String adminUrl,
@Nonnull @JsonProperty("serviceUrl") String serviceUrl,
@Nonnull @JsonProperty("format") Format format,
- @Nonnull @JsonProperty("scanStartupMode") String scanStartupMode,
+ @JsonProperty("scanStartupMode") String scanStartupMode,
@JsonProperty("primaryKey") String primaryKey,
@JsonProperty("scanStartupSubName") String scanStartupSubName,
@JsonProperty("scanStartupSubStartOffset") String
scanStartupSubStartOffset,
@@ -119,8 +119,7 @@ public class PulsarExtractNode extends ExtractNode
implements InlongMetric, Meta
this.topic = Preconditions.checkNotNull(topic, "pulsar topic is
null.");
this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar
serviceUrl is null.");
this.format = Preconditions.checkNotNull(format, "pulsar format is
null.");
- this.scanStartupMode = Preconditions.checkNotNull(scanStartupMode,
- "pulsar scanStartupMode is null.");
+ this.scanStartupMode = scanStartupMode;
this.adminUrl = adminUrl;
this.primaryKey = primaryKey;
this.scanStartupSubName = scanStartupSubName;
@@ -150,10 +149,14 @@ public class PulsarExtractNode extends ExtractNode
implements InlongMetric, Meta
}
options.put("service-url", serviceUrl);
options.put("topic", topic);
- options.put("scan.startup.mode", scanStartupMode);
+ if (StringUtils.isNotBlank(scanStartupMode)) {
+ options.put("scan.startup.mode", scanStartupMode);
+ }
if (StringUtils.isNotBlank(scanStartupSubName)) {
options.put("scan.startup.sub-name", scanStartupSubName);
- options.put("scan.startup.sub-start-offset",
scanStartupSubStartOffset);
+ if (StringUtils.isNotBlank(scanStartupSubStartOffset)) {
+ options.put("scan.startup.sub-start-offset",
scanStartupSubStartOffset);
+ }
}
if (StringUtils.isNotBlank(clientAuthPluginClassName)