This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d77048497a [INLONG-10213][SDK] SortSDK support unified sort
configuration (#10219)
d77048497a is described below
commit d77048497a66a07bba7c7dd7d2b56450973bd841
Author: vernedeng <[email protected]>
AuthorDate: Wed May 15 16:40:17 2024 +0800
[INLONG-10213][SDK] SortSDK support unified sort configuration (#10219)
* [INLONG-10213][SDK] SortSDK support unified sort configuration
---
.../common/pojo/sort/dataflow/DataFlowConfig.java | 1 +
.../common/pojo/sort/mq/PulsarClusterConfig.java | 3 +
.../listener/queue/ClusterConfigListener.java | 1 +
.../resource/sort/DefaultSortConfigOperator.java | 1 +
.../config/holder/v2/SortConfigHolder.java | 37 ++++++++-
.../loader/SortConfigQueryConsumeConfig.java | 90 ++++++++++++++++++++++
6 files changed, 130 insertions(+), 3 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
index a7bb1c36a3..a089bdc3f5 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
@@ -35,6 +35,7 @@ public class DataFlowConfig implements Serializable {
private String dataflowId;
private Integer version;
+ private String auditTag;
private String inlongGroupId;
private String inlongStreamId;
private SourceConfig sourceConfig;
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java
index 1bb503b7a4..071c93f9c1 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java
@@ -30,4 +30,7 @@ public class PulsarClusterConfig extends MqClusterConfig {
@JsonInclude(JsonInclude.Include.NON_NULL)
private String serviceUrl;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private String token;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java
index 3a09c4ecea..8c0aa037ae 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java
@@ -84,6 +84,7 @@ public class ClusterConfigListener implements
ClusterOperateListener {
CommonBeanUtils.copyProperties(pulsarCluster,
PulsarClusterConfig::new);
pulsarClusterConfig.setVersion(String.valueOf(pulsarCluster.getVersion()));
pulsarClusterConfig.setClusterName(pulsarCluster.getName());
+ pulsarClusterConfig.setServiceUrl(pulsarCluster.getUrl());
list.add(pulsarClusterConfig);
}
clusterConfigEntity.setConfigParams(objectMapper.writeValueAsString(list));
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 5c51c1abcf..670201c357 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -160,6 +160,7 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
return DataFlowConfig.builder()
.dataflowId(String.valueOf(sink.getId()))
.sourceConfig(getSourceConfig(groupInfo, streamInfo, sink))
+ .auditTag(String.valueOf(sink.getId()))
.sinkConfig(getSinkConfig(sink))
.inlongGroupId(groupInfo.getInlongGroupId())
.inlongStreamId(streamInfo.getInlongStreamId())
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index ef9106cc97..eb2bf9bf92 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -17,22 +17,29 @@
package org.apache.inlong.sort.standalone.config.holder.v2;
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
import
org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader;
import
org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortClusterConfigLoader;
import org.apache.inlong.sort.standalone.config.loader.v2.SortConfigLoader;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
+import java.util.Collection;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.stream.Collectors;
import static
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
@@ -45,9 +52,10 @@ public class SortConfigHolder {
private Timer reloadTimer;
private SortConfigLoader loader;
private SortConfig config;
+ private Map<String, Map<String, String>> auditTagMap;
private SortConfigHolder() {
-
+ this.auditTagMap = new HashMap<>();
}
private static SortConfigHolder get() {
@@ -110,9 +118,21 @@ public class SortConfigHolder {
private void reload() {
try {
SortConfig newConfig = this.loader.load();
- if (newConfig != null) {
- this.config = newConfig;
+ if (newConfig == null) {
+ return;
}
+
+ // <SortTaskName, <InlongId, AuditTag>>
+ this.auditTagMap = newConfig.getTasks()
+ .stream()
+ .collect(Collectors.toMap(SortTaskConfig::getSortTaskName,
+ v -> v.getClusters()
+ .stream()
+ .map(SortClusterConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toMap(flow ->
InlongId.generateUid(flow.getInlongGroupId(),
+ flow.getInlongStreamId()),
+ DataFlowConfig::getAuditTag))));
} catch (Throwable e) {
log.error("failed to reload sort config", e);
}
@@ -133,4 +153,15 @@ public class SortConfigHolder {
}
return null;
}
+
+ public static String getAuditTag(
+ String sortTaskName,
+ String inlongGroupId,
+ String inlongStreamId) {
+ Map<String, String> taskMap = get().auditTagMap.get(sortTaskName);
+ if (taskMap == null) {
+ return null;
+ }
+ return taskMap.get(InlongId.generateUid(inlongGroupId,
inlongStreamId));
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
new file mode 100644
index 0000000000..3f893041a6
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
+import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
+import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class SortConfigQueryConsumeConfig implements QueryConsumeConfig {
+
+ private List<InLongTopic> subscribedTopic = new ArrayList<>();
+
+ public void reload() {
+
+ }
+
+ @Override
+ public ConsumeConfig queryCurrentConsumeConfig(String sortTaskId) {
+ SortTaskConfig taskConfig = SortConfigHolder.getTaskConfig(sortTaskId);
+ List<InLongTopic> topics = taskConfig.getClusters()
+ .stream()
+ .map(this::parseTopics)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+
+ return new ConsumeConfig(topics);
+ }
+
+ public List<InLongTopic> parseTopics(SortClusterConfig clusterConfig) {
+ List<InLongTopic> topics = new ArrayList<>();
+ List<MqClusterConfig> mqClusterConfigs =
clusterConfig.getMqClusterConfigs();
+ List<DataFlowConfig> dataFlowConfigs =
clusterConfig.getDataFlowConfigs();
+ for (MqClusterConfig mq : mqClusterConfigs) {
+ for (DataFlowConfig flow : dataFlowConfigs) {
+ InLongTopic topic = new InLongTopic();
+ topic.setInLongCluster(this.parseCacheZone(mq));
+ topic.setTopic(flow.getSourceConfig().getTopic());
+ // only supports pulsar now
+ topic.setTopicType(InlongTopicTypeEnum.PULSAR.getName());
+ topic.setProperties(flow.getProperties() != null ?
flow.getProperties() : new HashMap<>());
+ topics.add(topic);
+ }
+ }
+ return topics;
+ }
+
+ public CacheZoneCluster parseCacheZone(MqClusterConfig mqClusterConfig) {
+ PulsarClusterConfig pulsarClusterConfig = (PulsarClusterConfig)
mqClusterConfig;
+ return new CacheZoneCluster(pulsarClusterConfig.getClusterName(),
+ pulsarClusterConfig.getServiceUrl(),
pulsarClusterConfig.getToken());
+ }
+
+ @Override
+ public void configure(ClientContext context) {
+
+ }
+}