This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8eec67bc72 [INLONG-10529][Sort] PulsarSink support switch metadata
acquire mode (#10554)
8eec67bc72 is described below
commit 8eec67bc7218bd1a5303de9c6eee967363a4927d
Author: vernedeng <[email protected]>
AuthorDate: Tue Jul 2 19:02:09 2024 +0800
[INLONG-10529][Sort] PulsarSink support switch metadata acquire mode
(#10554)
---
.../sink/pulsar/PulsarFederationSinkContext.java | 54 ++++++++++++++++------
.../standalone/sink/pulsar/PulsarIdConfig.java | 13 ++++++
2 files changed, 53 insertions(+), 14 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index 770f028da0..a31d9f90ba 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -20,8 +20,10 @@ package org.apache.inlong.sort.standalone.sink.pulsar;
import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
@@ -36,6 +38,7 @@ import org.slf4j.Logger;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -53,36 +56,59 @@ public class PulsarFederationSinkContext extends
SinkContext {
public void reload() {
try {
- TaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
- if (newSortTaskConfig == null) {
+ TaskConfig newTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
+ SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ if (newTaskConfig == null && newSortTaskConfig == null) {
LOG.error("newSortTaskConfig is null.");
return;
}
- if (this.taskConfig != null &&
this.taskConfig.equals(newSortTaskConfig)) {
+ if ((this.taskConfig != null &&
this.taskConfig.equals(newTaskConfig))
+ && (this.sortTaskConfig != null &&
this.sortTaskConfig.equals(newSortTaskConfig))) {
LOG.info("Same sortTaskConfig, do nothing.");
return;
}
- this.taskConfig = newSortTaskConfig;
- PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig)
newSortTaskConfig.getNodeConfig();
+ PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig)
newTaskConfig.getNodeConfig();
if (pulsarNodeConfig == null || requestNodeConfig.getVersion() >
pulsarNodeConfig.getVersion()) {
this.pulsarNodeConfig = requestNodeConfig;
}
+ this.taskConfig = newTaskConfig;
+ this.sortTaskConfig = newSortTaskConfig;
- this.idConfigMap = this.taskConfig.getClusterTagConfigs()
- .stream()
- .map(ClusterTagConfig::getDataFlowConfigs)
- .flatMap(Collection::stream)
- .map(PulsarIdConfig::create)
- .collect(Collectors.toMap(
- config ->
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
- v -> v,
- (v1, v2) -> v1));
+ Map<String, PulsarIdConfig> fromTaskConfig =
fromTaskConfig(taskConfig);
+ Map<String, PulsarIdConfig> fromSortTaskConfig =
fromSortTaskConfig(sortTaskConfig);
+ idConfigMap = unifiedConfiguration ? fromTaskConfig :
fromSortTaskConfig;
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
+ public Map<String, PulsarIdConfig> fromTaskConfig(TaskConfig taskConfig) {
+ return taskConfig.getClusterTagConfigs()
+ .stream()
+ .map(ClusterTagConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .map(PulsarIdConfig::create)
+ .collect(Collectors.toMap(
+ config ->
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+ v -> v,
+ (v1, v2) -> v1));
+ }
+
+ public Map<String, PulsarIdConfig> fromSortTaskConfig(SortTaskConfig
sortTaskConfig) {
+ Map<String, PulsarIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
+ List<Map<String, String>> idList = sortTaskConfig.getIdParams();
+ for (Map<String, String> idParam : idList) {
+ try {
+ PulsarIdConfig idConfig = new PulsarIdConfig(idParam);
+ newIdConfigMap.put(idConfig.getUid(), idConfig);
+ } catch (Exception e) {
+ LOG.error("fail to parse pulsar id config", e);
+ }
+ }
+ return newIdConfigMap;
+ }
+
public String getTopic(String uid) {
PulsarIdConfig idConfig = this.idConfigMap.get(uid);
if (idConfig == null) {
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index cb6c651982..69b042506d 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -21,12 +21,15 @@ import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.utils.Constants;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import java.util.Map;
+
@Data
@Builder
@NoArgsConstructor
@@ -46,6 +49,16 @@ public class PulsarIdConfig {
private String topic;
private DataTypeEnum dataType = DataTypeEnum.TEXT;
+ public PulsarIdConfig(Map<String, String> idParam) {
+ this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
+ this.inlongStreamId = idParam.getOrDefault(Constants.INLONG_STREAM_ID,
DEFAULT_INLONG_STREAM);
+ this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR,
PulsarIdConfig.DEFAULT_SEPARATOR);
+ this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
+ this.dataType = DataTypeEnum
+ .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE,
DataTypeEnum.TEXT.getType()));
+ }
+
public static PulsarIdConfig create(DataFlowConfig dataFlowConfig) {
PulsarSinkConfig sinkConfig = (PulsarSinkConfig)
dataFlowConfig.getSinkConfig();