This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8724e9e198 [INLONG-10526][Sort] ClsSink support switch metadata
acquire mode (#10539)
8724e9e198 is described below
commit 8724e9e198ae15373027b2b658ec751ad78d5a1d
Author: vernedeng <[email protected]>
AuthorDate: Mon Jul 1 17:18:55 2024 +0800
[INLONG-10526][Sort] ClsSink support switch metadata acquire mode (#10539)
* [INLONG-10526][Sort] ClsSink support switch metadata acquire mode
---------
Co-authored-by: vernedeng <[email protected]>
---
...ortClusterConfig.java => ClusterTagConfig.java} | 56 +++----
.../apache/inlong/common/pojo/sort/SortConfig.java | 14 +-
.../sort/{SortTaskConfig.java => TaskConfig.java} | 51 +++----
.../manager/service/core/impl/SortServiceImpl.java | 16 +-
.../config/holder/CommonPropertiesHolder.java | 1 +
.../config/holder/ManagerUrlHandler.java | 4 +
.../config/holder/v2/SortConfigHolder.java | 31 ++--
.../v2/SortConfigType.java} | 25 +---
.../loader/CommonPropertiesManagerUrlLoader.java | 18 +++
.../standalone/config/loader/ManagerUrlLoader.java | 2 +
.../loader/SortConfigQueryConsumeConfig.java | 10 +-
...der.java => ClassResourceSortConfigLoader.java} | 10 +-
...figLoader.java => ManagerSortConfigLoader.java} | 4 +-
.../standalone/utils/v2/FlumeConfigGenerator.java | 4 +-
.../inlong/sort/standalone/sink/SinkContext.java | 117 +++------------
.../sort/standalone/sink/cls/ClsIdConfig.java | 2 +
.../sort/standalone/sink/cls/ClsSinkContext.java | 61 ++++++--
.../sort/standalone/sink/elasticsearch/EsSink.java | 2 +-
.../sink/elasticsearch/EsSinkContext.java | 20 +--
.../sink/kafka/KafkaFederationSinkContext.java | 16 +-
.../sink/pulsar/PulsarFederationSinkContext.java | 16 +-
.../sort/standalone/sink/v2/SinkContext.java | 165 ---------------------
.../inlong/sort/standalone/v2/SortCluster.java | 6 +-
.../apache/inlong/sort/standalone/v2/SortTask.java | 4 +-
.../{SortClusterConfig.conf => SortConfig.conf} | 2 +-
.../src/test/java/common.properties | 1 +
.../{SortClusterConfig.conf => SortConfig.conf} | 2 +-
.../src/test/resources/common.properties | 1 +
28 files changed, 233 insertions(+), 428 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/ClusterTagConfig.java
similarity index 73%
rename from
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
rename to
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/ClusterTagConfig.java
index 6124ffb834..6ce473a340 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/ClusterTagConfig.java
@@ -35,13 +35,13 @@ import java.util.function.BiFunction;
@Builder
@AllArgsConstructor
@NoArgsConstructor
-public class SortClusterConfig implements Serializable {
+public class ClusterTagConfig implements Serializable {
private String clusterTag;
private List<MqClusterConfig> mqClusterConfigs;
private List<DataFlowConfig> dataFlowConfigs;
- public static SortClusterConfig checkDelete(SortClusterConfig last,
SortClusterConfig current) {
+ public static ClusterTagConfig checkDelete(ClusterTagConfig last,
ClusterTagConfig current) {
if (CollectionUtils.isEmpty(current.getMqClusterConfigs())) {
return last;
}
@@ -49,11 +49,11 @@ public class SortClusterConfig implements Serializable {
return check(last, current, MqClusterConfig::batchCheckLast,
DataFlowConfig::batchCheckDelete);
}
- public static SortClusterConfig checkNew(SortClusterConfig last,
SortClusterConfig current) {
+ public static ClusterTagConfig checkNew(ClusterTagConfig last,
ClusterTagConfig current) {
return check(last, current, MqClusterConfig::batchCheckLatest,
DataFlowConfig::batchCheckNew);
}
- public static SortClusterConfig checkUpdate(SortClusterConfig last,
SortClusterConfig current) {
+ public static ClusterTagConfig checkUpdate(ClusterTagConfig last,
ClusterTagConfig current) {
List<MqClusterConfig> updateCluster =
MqClusterConfig.batchCheckUpdate(last.getMqClusterConfigs(),
current.getMqClusterConfigs());
@@ -77,7 +77,7 @@ public class SortClusterConfig implements Serializable {
DataFlowConfig.batchCheckNoUpdate(last.getDataFlowConfigs(),
current.getDataFlowConfigs());
noUpdateDataflows.addAll(updateDataflows);
- return SortClusterConfig.builder()
+ return ClusterTagConfig.builder()
.clusterTag(last.getClusterTag())
.mqClusterConfigs(latestCluster)
.dataFlowConfigs(noUpdateDataflows)
@@ -90,14 +90,14 @@ public class SortClusterConfig implements Serializable {
}
// if only dataflow update, use latest mq and update dataflow
- return SortClusterConfig.builder()
+ return ClusterTagConfig.builder()
.clusterTag(last.getClusterTag())
.mqClusterConfigs(latestCluster)
.dataFlowConfigs(updateDataflows)
.build();
}
- public static SortClusterConfig checkLatest(SortClusterConfig last,
SortClusterConfig current) {
+ public static ClusterTagConfig checkLatest(ClusterTagConfig last,
ClusterTagConfig current) {
if (CollectionUtils.isEmpty(current.getMqClusterConfigs())) {
return null;
}
@@ -105,40 +105,40 @@ public class SortClusterConfig implements Serializable {
return check(last, current, MqClusterConfig::batchCheckLatest,
DataFlowConfig::batchCheckLatest);
}
- public static List<SortClusterConfig> batchCheckDelete(
- List<SortClusterConfig> last,
- List<SortClusterConfig> current) {
+ public static List<ClusterTagConfig> batchCheckDelete(
+ List<ClusterTagConfig> last,
+ List<ClusterTagConfig> current) {
return SortConfigUtil.batchCheckDeleteRecursive(last, current,
- SortClusterConfig::getClusterTag,
SortClusterConfig::checkDelete);
+ ClusterTagConfig::getClusterTag,
ClusterTagConfig::checkDelete);
}
- public static List<SortClusterConfig> batchCheckNew(
- List<SortClusterConfig> last,
- List<SortClusterConfig> current) {
+ public static List<ClusterTagConfig> batchCheckNew(
+ List<ClusterTagConfig> last,
+ List<ClusterTagConfig> current) {
return SortConfigUtil.batchCheckNewRecursive(last, current,
- SortClusterConfig::getClusterTag, SortClusterConfig::checkNew);
+ ClusterTagConfig::getClusterTag, ClusterTagConfig::checkNew);
}
- public static List<SortClusterConfig> batchCheckUpdate(
- List<SortClusterConfig> last,
- List<SortClusterConfig> current) {
+ public static List<ClusterTagConfig> batchCheckUpdate(
+ List<ClusterTagConfig> last,
+ List<ClusterTagConfig> current) {
return SortConfigUtil.batchCheckUpdateRecursive(last, current,
- SortClusterConfig::getClusterTag,
SortClusterConfig::checkUpdate);
+ ClusterTagConfig::getClusterTag,
ClusterTagConfig::checkUpdate);
}
- public static List<SortClusterConfig> batchCheckLatest(
- List<SortClusterConfig> last,
- List<SortClusterConfig> current) {
+ public static List<ClusterTagConfig> batchCheckLatest(
+ List<ClusterTagConfig> last,
+ List<ClusterTagConfig> current) {
return SortConfigUtil.batchCheckLatestRecursive(last, current,
- SortClusterConfig::getClusterTag,
SortClusterConfig::checkLatest);
+ ClusterTagConfig::getClusterTag,
ClusterTagConfig::checkLatest);
}
- public static SortClusterConfig check(
- SortClusterConfig last, SortClusterConfig current,
+ public static ClusterTagConfig check(
+ ClusterTagConfig last, ClusterTagConfig current,
BiFunction<List<MqClusterConfig>, List<MqClusterConfig>,
List<MqClusterConfig>> mqCheckFunction,
BiFunction<List<DataFlowConfig>, List<DataFlowConfig>,
List<DataFlowConfig>> flowCheckFunction) {
- List<MqClusterConfig> checkCluster = mqCheckFunction
+ List<MqClusterConfig> checkMqCluster = mqCheckFunction
.apply(last.getMqClusterConfigs(),
current.getMqClusterConfigs());
List<DataFlowConfig> checkDataflows = flowCheckFunction
.apply(last.getDataFlowConfigs(),
current.getDataFlowConfigs());
@@ -147,9 +147,9 @@ public class SortClusterConfig implements Serializable {
return null;
}
- return SortClusterConfig.builder()
+ return ClusterTagConfig.builder()
.clusterTag(last.getClusterTag())
- .mqClusterConfigs(checkCluster)
+ .mqClusterConfigs(checkMqCluster)
.dataFlowConfigs(checkDataflows)
.build();
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
index 5f0b6c0b6d..39b7572f88 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
@@ -34,7 +34,7 @@ import java.util.function.BiFunction;
public class SortConfig implements Serializable {
private String sortClusterName;
- private List<SortTaskConfig> tasks;
+ private List<TaskConfig> tasks;
public static SortConfig checkLatest(SortConfig last, SortConfig current) {
if (last == null) {
@@ -45,7 +45,7 @@ public class SortConfig implements Serializable {
}
return SortConfig.builder()
.sortClusterName(current.getSortClusterName())
- .tasks(SortTaskConfig.batchCheckLatest(last.getTasks(),
current.getTasks()))
+ .tasks(TaskConfig.batchCheckLatest(last.getTasks(),
current.getTasks()))
.build();
}
@@ -56,14 +56,14 @@ public class SortConfig implements Serializable {
if (current == null) {
return last;
}
- return check(last, current, SortTaskConfig::batchCheckDelete);
+ return check(last, current, TaskConfig::batchCheckDelete);
}
public static SortConfig checkUpdate(SortConfig last, SortConfig current) {
if (last == null || current == null) {
return null;
}
- return check(last, current, SortTaskConfig::batchCheckUpdate);
+ return check(last, current, TaskConfig::batchCheckUpdate);
}
public static SortConfig checkNew(SortConfig last, SortConfig current) {
@@ -73,17 +73,17 @@ public class SortConfig implements Serializable {
if (current == null) {
return null;
}
- return check(last, current, SortTaskConfig::batchCheckNew);
+ return check(last, current, TaskConfig::batchCheckNew);
}
public static SortConfig check(
SortConfig last, SortConfig current,
- BiFunction<List<SortTaskConfig>, List<SortTaskConfig>,
List<SortTaskConfig>> taskCheckFunction) {
+ BiFunction<List<TaskConfig>, List<TaskConfig>, List<TaskConfig>>
taskCheckFunction) {
if (!last.getSortClusterName().equals(current.getSortClusterName())) {
return null;
}
- List<SortTaskConfig> checkTasks =
taskCheckFunction.apply(last.getTasks(), current.getTasks());
+ List<TaskConfig> checkTasks = taskCheckFunction.apply(last.getTasks(),
current.getTasks());
if (CollectionUtils.isEmpty(checkTasks)) {
return null;
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/TaskConfig.java
similarity index 59%
rename from
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
rename to
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/TaskConfig.java
index 107efcec80..69dd63cd53 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/TaskConfig.java
@@ -34,39 +34,39 @@ import java.util.function.BiFunction;
@Builder
@AllArgsConstructor
@NoArgsConstructor
-public class SortTaskConfig implements Serializable {
+public class TaskConfig implements Serializable {
private String sortTaskName;
- private List<SortClusterConfig> clusters;
+ private List<ClusterTagConfig> clusterTagConfigs;
private NodeConfig nodeConfig;
- public static List<SortTaskConfig> batchCheckDelete(List<SortTaskConfig>
last, List<SortTaskConfig> current) {
+ public static List<TaskConfig> batchCheckDelete(List<TaskConfig> last,
List<TaskConfig> current) {
return SortConfigUtil.batchCheckDeleteRecursive(last, current,
- SortTaskConfig::getSortTaskName, SortTaskConfig::checkDelete);
+ TaskConfig::getSortTaskName, TaskConfig::checkDelete);
}
- public static List<SortTaskConfig> batchCheckUpdate(List<SortTaskConfig>
last, List<SortTaskConfig> current) {
+ public static List<TaskConfig> batchCheckUpdate(List<TaskConfig> last,
List<TaskConfig> current) {
return SortConfigUtil.batchCheckUpdateRecursive(last, current,
- SortTaskConfig::getSortTaskName, SortTaskConfig::checkUpdate);
+ TaskConfig::getSortTaskName, TaskConfig::checkUpdate);
}
- public static List<SortTaskConfig> batchCheckNew(List<SortTaskConfig>
last, List<SortTaskConfig> current) {
+ public static List<TaskConfig> batchCheckNew(List<TaskConfig> last,
List<TaskConfig> current) {
return SortConfigUtil.batchCheckNewRecursive(last, current,
- SortTaskConfig::getSortTaskName, SortTaskConfig::checkNew);
+ TaskConfig::getSortTaskName, TaskConfig::checkNew);
}
- public static List<SortTaskConfig> batchCheckLatest(List<SortTaskConfig>
latest, List<SortTaskConfig> current) {
+ public static List<TaskConfig> batchCheckLatest(List<TaskConfig> latest,
List<TaskConfig> current) {
return SortConfigUtil.batchCheckLatestRecursive(latest, current,
- SortTaskConfig::getSortTaskName, SortTaskConfig::checkLatest);
+ TaskConfig::getSortTaskName, TaskConfig::checkLatest);
}
- public static SortTaskConfig checkDelete(SortTaskConfig last,
SortTaskConfig current) {
- return check(last, current, SortClusterConfig::batchCheckDelete,
+ public static TaskConfig checkDelete(TaskConfig last, TaskConfig current) {
+ return check(last, current, ClusterTagConfig::batchCheckDelete,
(lastNode, currentNode) -> lastNode);
}
- public static SortTaskConfig checkUpdate(SortTaskConfig last,
SortTaskConfig current) {
- return check(last, current, SortClusterConfig::batchCheckUpdate,
+ public static TaskConfig checkUpdate(TaskConfig last, TaskConfig current) {
+ return check(last, current, ClusterTagConfig::batchCheckUpdate,
(lastNode, currentNode) -> {
if (lastNode == null || currentNode == null) {
return null;
@@ -75,8 +75,8 @@ public class SortTaskConfig implements Serializable {
});
}
- public static SortTaskConfig checkNew(SortTaskConfig last, SortTaskConfig
current) {
- return check(last, current, SortClusterConfig::batchCheckNew,
+ public static TaskConfig checkNew(TaskConfig last, TaskConfig current) {
+ return check(last, current, ClusterTagConfig::batchCheckNew,
(lastNode, currentNode) -> {
if (lastNode == null || currentNode == null) {
return null;
@@ -85,8 +85,8 @@ public class SortTaskConfig implements Serializable {
});
}
- public static SortTaskConfig checkLatest(SortTaskConfig last,
SortTaskConfig current) {
- return check(last, current, SortClusterConfig::batchCheckLatest,
+ public static TaskConfig checkLatest(TaskConfig last, TaskConfig current) {
+ return check(last, current, ClusterTagConfig::batchCheckLatest,
(lastNode, currentNode) -> {
if (lastNode == null || currentNode == null) {
return null;
@@ -95,23 +95,24 @@ public class SortTaskConfig implements Serializable {
});
}
- public static SortTaskConfig check(
- SortTaskConfig last, SortTaskConfig current,
- BiFunction<List<SortClusterConfig>, List<SortClusterConfig>,
List<SortClusterConfig>> clusterCheckFunction,
+ public static TaskConfig check(
+ TaskConfig last, TaskConfig current,
+ BiFunction<List<ClusterTagConfig>, List<ClusterTagConfig>,
List<ClusterTagConfig>> clusterCheckFunction,
BiFunction<NodeConfig, NodeConfig, NodeConfig> nodeCheckFunction) {
- List<SortClusterConfig> checkCluster =
clusterCheckFunction.apply(last.getClusters(), current.getClusters());
+ List<ClusterTagConfig> checkClusterTags =
+ clusterCheckFunction.apply(last.getClusterTagConfigs(),
current.getClusterTagConfigs());
NodeConfig checkNode = nodeCheckFunction.apply(last.getNodeConfig(),
current.getNodeConfig());
- if (CollectionUtils.isEmpty(checkCluster) && checkNode == null) {
+ if (CollectionUtils.isEmpty(checkClusterTags) && checkNode == null) {
return null;
}
- return SortTaskConfig
+ return TaskConfig
.builder()
.sortTaskName(last.getSortTaskName())
- .clusters(checkCluster)
+ .clusterTagConfigs(checkClusterTags)
.nodeConfig(checkNode)
.build();
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 507cc1fc1a..2f87d3b92e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -18,10 +18,10 @@
package org.apache.inlong.manager.service.core.impl;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.common.pojo.sort.SortConfigResponse;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
@@ -272,7 +272,7 @@ public class SortServiceImpl implements SortService,
PluginBinder {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, byte[]> sortConfigs = new HashMap<>();
Map<String, String> sortConfigMd5s = new HashMap<>();
- Map<String, List<SortTaskConfig>> temp = new HashMap<>();
+ Map<String, List<TaskConfig>> temp = new HashMap<>();
List<SortConfigEntity> sinkConfigEntityList =
configLoader.loadAllSortConfigEntity();
for (SortConfigEntity sortConfigEntity : sinkConfigEntityList) {
if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
@@ -284,16 +284,16 @@ public class SortServiceImpl implements SortService,
PluginBinder {
Collectors.groupingBy(SortConfigEntity::getSortTaskName,
Collectors.groupingBy(SortConfigEntity::getInlongClusterTag))));
for (String sortClusterName : cluster2SinkMap.keySet()) {
- List<SortTaskConfig> map = temp.computeIfAbsent(sortClusterName,
+ List<TaskConfig> map = temp.computeIfAbsent(sortClusterName,
v -> new ArrayList<>());
SortConfig sortConfig = new SortConfig();
sortConfig.setSortClusterName(sortClusterName);
Map<String, Map<String, List<SortConfigEntity>>> sortTaskNameMap =
cluster2SinkMap.get(sortClusterName);
for (String sortTaskName : sortTaskNameMap.keySet()) {
Map<String, List<SortConfigEntity>> clusterTagMap =
sortTaskNameMap.get(sortTaskName);
- SortTaskConfig sortTaskConfig = SortTaskConfig.builder()
+ TaskConfig sortTaskConfig = TaskConfig.builder()
.sortTaskName(sortTaskName)
- .clusters(new ArrayList<>())
+ .clusterTagConfigs(new ArrayList<>())
.nodeConfig(nodeInfoMap.get(sortTaskName))
.build();
for (String clusterTag : clusterTagMap.keySet()) {
@@ -308,12 +308,12 @@ public class SortServiceImpl implements SortService,
PluginBinder {
}).filter(Objects::nonNull)
.sorted(Comparator.comparingInt(x ->
Integer.parseInt(x.getDataflowId())))
.collect(Collectors.toList());
- SortClusterConfig sortClusterConfig =
SortClusterConfig.builder()
+ ClusterTagConfig sortClusterConfig =
ClusterTagConfig.builder()
.mqClusterConfigs(mqClusterConfigMap.getOrDefault(clusterTag, new
ArrayList<>()))
.clusterTag(clusterTag)
.dataFlowConfigs(dataFlowConfigs)
.build();
- sortTaskConfig.getClusters().add(sortClusterConfig);
+
sortTaskConfig.getClusterTagConfigs().add(sortClusterConfig);
}
map.add(sortTaskConfig);
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index b91e5801c8..1d8447121c 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -40,6 +40,7 @@ public class CommonPropertiesHolder {
public static final String KEY_COMMON_PROPERTIES =
"common_properties_loader";
public static final String KEY_CLUSTER_ID = "clusterId";
public static final String KEY_SORT_SOURCE_ACKPOLICY =
"sortSource.ackPolicy";
+ public static final String KEY_USE_UNIFIED_CONFIGURATION =
"useUnifiedConfiguration";
private static Map<String, String> props;
private static Context context;
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
index 25deaf87a5..df357743cb 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
@@ -66,6 +66,10 @@ public class ManagerUrlHandler {
return get().acquireSortClusterConfigUrl();
}
+ public static String getSortConfigUrl() {
+ return get().acquireSortConfigUrl();
+ }
+
private static ManagerUrlLoader get() {
if (instance != null) {
return instance;
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index f1c52d2fc4..7a3ea1ac0f 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -17,14 +17,13 @@
package org.apache.inlong.sort.standalone.config.holder.v2;
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.SortConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
-import
org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader;
-import
org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortClusterConfigLoader;
+import
org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortConfigLoader;
+import
org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortConfigLoader;
import org.apache.inlong.sort.standalone.config.loader.v2.SortConfigLoader;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
@@ -71,12 +70,12 @@ public class SortConfigHolder {
instance = new SortConfigHolder();
instance.reloadInterval =
CommonPropertiesHolder.getLong(RELOAD_INTERVAL, 60000L);
String loaderType = CommonPropertiesHolder
- .getString(SortClusterConfigType.KEY_TYPE,
SortClusterConfigType.MANAGER.name());
+ .getString(SortConfigType.KEY_TYPE,
SortConfigType.MANAGER.name());
- if
(SortClusterConfigType.FILE.name().equalsIgnoreCase(loaderType)) {
- instance.loader = new ClassResourceSortClusterConfigLoader();
- } else if
(SortClusterConfigType.MANAGER.name().equalsIgnoreCase(loaderType)) {
- instance.loader = new ManagerSortClusterConfigLoader();
+ if (SortConfigType.FILE.name().equalsIgnoreCase(loaderType)) {
+ instance.loader = new ClassResourceSortConfigLoader();
+ } else if
(SortConfigType.MANAGER.name().equalsIgnoreCase(loaderType)) {
+ instance.loader = new ManagerSortConfigLoader();
} else {
// user-defined
try {
@@ -90,7 +89,7 @@ public class SortConfigHolder {
}
}
if (instance.loader == null) {
- instance.loader = new ClassResourceSortClusterConfigLoader();
+ instance.loader = new ClassResourceSortConfigLoader();
}
try {
instance.loader.configure(new
Context(CommonPropertiesHolder.get()));
@@ -125,10 +124,10 @@ public class SortConfigHolder {
// <SortTaskName, <InlongId, AuditTag>>
this.auditTagMap = newConfig.getTasks()
.stream()
- .collect(Collectors.toMap(SortTaskConfig::getSortTaskName,
- v -> v.getClusters()
+ .collect(Collectors.toMap(TaskConfig::getSortTaskName,
+ v -> v.getClusterTagConfigs()
.stream()
- .map(SortClusterConfig::getDataFlowConfigs)
+ .map(ClusterTagConfig::getDataFlowConfigs)
.flatMap(Collection::stream)
.filter(flow ->
StringUtils.isNotEmpty(flow.getAuditTag()))
.collect(Collectors.toMap(flow ->
InlongId.generateUid(flow.getInlongGroupId(),
@@ -145,10 +144,10 @@ public class SortConfigHolder {
return get().config;
}
- public static SortTaskConfig getTaskConfig(String sortTaskName) {
+ public static TaskConfig getTaskConfig(String sortTaskName) {
SortConfig config = get().config;
if (config != null && config.getTasks() != null) {
- for (SortTaskConfig task : config.getTasks()) {
+ for (TaskConfig task : config.getTasks()) {
if (StringUtils.equals(sortTaskName, task.getSortTaskName())) {
return task;
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigType.java
similarity index 63%
copy from
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
copy to
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigType.java
index 4b6f4f3d00..d5196625b6 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigType.java
@@ -15,26 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.config.loader;
+package org.apache.inlong.sort.standalone.config.holder.v2;
-import org.apache.flume.conf.Configurable;
+public enum SortConfigType {
-/**
- * Interface of ManagerUrlLoader.
- */
-public interface ManagerUrlLoader extends Configurable {
-
- /**
- * Acquire SortSourceConfigUrl
- *
- * @return SortSourceConfigUrl
- */
- String acquireSortSourceConfigUrl();
+ FILE, MANAGER, USER_DEFINED;
- /**
- * Acquire SortClusterConfigUrl
- *
- * @return SortClusterConfigUrl
- */
- String acquireSortClusterConfigUrl();
+ public static final String KEY_TYPE = "sortConfig.type";
+ public static final String KEY_FILE = "sortConfig.file";
+ public static final String DEFAULT_FILE = "SortConfig.conf";
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesManagerUrlLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesManagerUrlLoader.java
index a02538ca77..089f8fbe60 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesManagerUrlLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesManagerUrlLoader.java
@@ -33,9 +33,11 @@ public class CommonPropertiesManagerUrlLoader implements
ManagerUrlLoader {
private static final Logger LOG =
InlongLoggerFactory.getLogger(CommonPropertiesManagerUrlLoader.class);
private static final String KEY_SORT_CLUSTER_CONFIG_MANAGER_URL =
"sortClusterConfig.managerUrl";
private static final String KEY_SORT_SOURCE_CONFIG_MANAGER_URL =
"sortSourceConfig.managerUrl";
+ private static final String KEY_SORT_CONFIG_MANAGER_URL =
"sortConfig.managerUrl";
private String sortSourceConfigUrl;
private String sortClusterConfigUrl;
+ private String sortConfigUrl;
public Context context;
@Override
@@ -70,6 +72,22 @@ public class CommonPropertiesManagerUrlLoader implements
ManagerUrlLoader {
return sortClusterConfigUrl;
}
+ @Override
+ public String acquireSortConfigUrl() {
+ if (sortConfigUrl != null) {
+ return sortConfigUrl;
+ }
+ sortConfigUrl = context.getString(KEY_SORT_CONFIG_MANAGER_URL);
+ if (StringUtils.isBlank(sortConfigUrl)) {
+ String warnMsg = "Get key" + KEY_SORT_CONFIG_MANAGER_URL
+ + " from CommonPropertiesHolder failed, it's a optional
property use to specify "
+ + "the url where Sort-Standalone request
SortSourceConfig.";
+ LOG.warn(warnMsg);
+ sortConfigUrl = warnMsg;
+ }
+ return sortConfigUrl;
+ }
+
@Override
public void configure(Context context) {
Optional.ofNullable(context).ifPresent(c -> this.context = c);
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
index 4b6f4f3d00..4e133369b6 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
@@ -37,4 +37,6 @@ public interface ManagerUrlLoader extends Configurable {
* @return SortClusterConfigUrl
*/
String acquireSortClusterConfigUrl();
+
+ String acquireSortConfigUrl();
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
index 3f893041a6..079044f020 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
@@ -17,8 +17,8 @@
package org.apache.inlong.sort.standalone.config.loader;
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
@@ -49,8 +49,8 @@ public class SortConfigQueryConsumeConfig implements
QueryConsumeConfig {
@Override
public ConsumeConfig queryCurrentConsumeConfig(String sortTaskId) {
- SortTaskConfig taskConfig = SortConfigHolder.getTaskConfig(sortTaskId);
- List<InLongTopic> topics = taskConfig.getClusters()
+ TaskConfig taskConfig = SortConfigHolder.getTaskConfig(sortTaskId);
+ List<InLongTopic> topics = taskConfig.getClusterTagConfigs()
.stream()
.map(this::parseTopics)
.flatMap(Collection::stream)
@@ -59,7 +59,7 @@ public class SortConfigQueryConsumeConfig implements
QueryConsumeConfig {
return new ConsumeConfig(topics);
}
- public List<InLongTopic> parseTopics(SortClusterConfig clusterConfig) {
+ public List<InLongTopic> parseTopics(ClusterTagConfig clusterConfig) {
List<InLongTopic> topics = new ArrayList<>();
List<MqClusterConfig> mqClusterConfigs =
clusterConfig.getMqClusterConfigs();
List<DataFlowConfig> dataFlowConfigs =
clusterConfig.getDataFlowConfigs();
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortConfigLoader.java
similarity index 84%
rename from
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
rename to
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortConfigLoader.java
index 0d3cd08089..77f8580f00 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortConfigLoader.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.standalone.config.loader.v2;
import org.apache.inlong.common.pojo.sort.SortConfig;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigType;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.commons.io.IOUtils;
@@ -28,18 +28,18 @@ import org.slf4j.Logger;
import java.nio.charset.Charset;
-public class ClassResourceSortClusterConfigLoader implements SortConfigLoader {
+public class ClassResourceSortConfigLoader implements SortConfigLoader {
- public static final Logger LOG =
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(ClassResourceSortConfigLoader.class);
private Context context;
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public SortConfig load() {
- String fileName = SortClusterConfigType.DEFAULT_FILE;
+ String fileName = SortConfigType.DEFAULT_FILE;
try {
if (context != null) {
- fileName = context.getString(SortClusterConfigType.KEY_FILE,
SortClusterConfigType.DEFAULT_FILE);
+ fileName = context.getString(SortConfigType.KEY_FILE,
SortConfigType.DEFAULT_FILE);
}
String confString =
IOUtils.toString(getClass().getClassLoader().getResource(fileName),
Charset.defaultCharset());
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortConfigLoader.java
similarity index 96%
rename from
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
rename to
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortConfigLoader.java
index 44b55ee4c8..229b53c591 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortConfigLoader.java
@@ -38,7 +38,7 @@ import org.apache.http.util.EntityUtils;
import java.util.concurrent.TimeUnit;
@Slf4j
-public class ManagerSortClusterConfigLoader implements SortConfigLoader {
+public class ManagerSortConfigLoader implements SortConfigLoader {
private Context context;
private CloseableHttpClient httpClient;
@@ -66,7 +66,7 @@ public class ManagerSortClusterConfigLoader implements
SortConfigLoader {
HttpGet httpGet = null;
try {
String clusterName =
this.context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
- String url = ManagerUrlHandler.getSortClusterConfigUrl() +
"?clusterName="
+ String url = ManagerUrlHandler.getSortConfigUrl() + "?clusterName="
+ clusterName + "&md5=";
if (StringUtils.isNotBlank(this.md5)) {
url += this.md5;
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
index 3edf16af72..ded015d43b 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sort.standalone.utils.v2;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import java.util.HashMap;
@@ -36,7 +36,7 @@ public class FlumeConfigGenerator {
public static final String KEY_ROLLBACK_START_TIME = "rollback.startTime";
public static final String KEY_ROLLBACK_STOP_TIME = "rollback.stopTime";
- public static Map<String, String>
generateFlumeConfiguration(SortTaskConfig taskConfig) {
+ public static Map<String, String> generateFlumeConfiguration(TaskConfig
taskConfig) {
Map<String, String> flumeConf = new HashMap<>();
String sortTaskName = taskConfig.getSortTaskName();
appendChannels(flumeConf, sortTaskName);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 16b15ab8b9..36e3d11254 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -18,10 +18,12 @@
package org.apache.inlong.sort.standalone.sink;
import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
@@ -37,85 +39,62 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
-/**
- *
- * SinkContext
- */
-@Deprecated
public class SinkContext {
public static final Logger LOG =
InlongLoggerFactory.getLogger(SinkContext.class);
-
public static final String KEY_MAX_THREADS = "maxThreads";
public static final String KEY_PROCESSINTERVAL = "processInterval";
public static final String KEY_RELOADINTERVAL = "reloadInterval";
public static final String KEY_TASK_NAME = "taskName";
public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB =
"maxBufferQueueSizeKb";
public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
-
protected final String clusterId;
protected final String taskName;
protected final String sinkName;
protected final Context sinkContext;
-
+ protected TaskConfig taskConfig;
+ @Deprecated
protected SortTaskConfig sortTaskConfig;
-
protected final Channel channel;
- //
protected final int maxThreads;
protected final long processInterval;
protected final long reloadInterval;
- //
+ protected final boolean unifiedConfiguration;
protected final SortMetricItemSet metricItemSet;
protected Timer reloadTimer;
- /**
- * Constructor
- *
- * @param sinkName
- * @param context
- * @param channel
- */
public SinkContext(String sinkName, Context context, Channel channel) {
this.sinkName = sinkName;
this.sinkContext = context;
this.channel = channel;
- this.clusterId =
context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
- this.taskName = context.getString(KEY_TASK_NAME);
+ this.clusterId =
sinkContext.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+ this.taskName = sinkContext.getString(KEY_TASK_NAME);
this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL,
100);
this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
- //
this.metricItemSet = new SortMetricItemSet(sinkName);
+ this.unifiedConfiguration =
sinkContext.getBoolean(CommonPropertiesHolder.KEY_USE_UNIFIED_CONFIGURATION,
+ false);
MetricRegister.register(this.metricItemSet);
}
- /**
- * start
- */
public void start() {
try {
this.reload();
this.setReloadTimer();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("failed to start sink context", e);
}
}
- /**
- * close
- */
public void close() {
try {
this.reloadTimer.cancel();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("failed to close sink context", e);
}
}
- /**
- * setReloadTimer
- */
protected void setReloadTimer() {
reloadTimer = new Timer(true);
TimerTask task = new TimerTask() {
@@ -127,113 +106,63 @@ public class SinkContext {
reloadTimer.schedule(task, new Date(System.currentTimeMillis() +
reloadInterval), reloadInterval);
}
- /**
- * reload
- */
public void reload() {
try {
this.sortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ this.taskConfig = SortConfigHolder.getTaskConfig(taskName);
} catch (Throwable e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("failed to stop sink context", e);
}
}
- /**
- * get clusterId
- *
- * @return the clusterId
- */
public String getClusterId() {
return clusterId;
}
- /**
- * get taskName
- *
- * @return the taskName
- */
public String getTaskName() {
return taskName;
}
- /**
- * get sinkName
- *
- * @return the sinkName
- */
public String getSinkName() {
return sinkName;
}
- /**
- * get sinkContext
- *
- * @return the sinkContext
- */
public Context getSinkContext() {
return sinkContext;
}
- /**
- * get sortTaskConfig
- *
- * @return the sortTaskConfig
- */
+ public TaskConfig getTaskConfig() {
+ return taskConfig;
+ }
+
public SortTaskConfig getSortTaskConfig() {
return sortTaskConfig;
}
- /**
- * get channel
- *
- * @return the channel
- */
+ public boolean isUnifiedConfiguration() {
+ return unifiedConfiguration;
+ }
+
public Channel getChannel() {
return channel;
}
- /**
- * get maxThreads
- *
- * @return the maxThreads
- */
public int getMaxThreads() {
return maxThreads;
}
- /**
- * get processInterval
- *
- * @return the processInterval
- */
public long getProcessInterval() {
return processInterval;
}
- /**
- * get reloadInterval
- *
- * @return the reloadInterval
- */
public long getReloadInterval() {
return reloadInterval;
}
- /**
- * get metricItemSet
- *
- * @return the metricItemSet
- */
public SortMetricItemSet getMetricItemSet() {
return metricItemSet;
}
- /**
- * fillInlongId
- *
- * @param currentRecord
- * @param dimensions
- */
public static void fillInlongId(ProfileEvent currentRecord, Map<String,
String> dimensions) {
String inlongGroupId = currentRecord.getInlongGroupId();
inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" :
inlongGroupId;
@@ -243,10 +172,6 @@ public class SinkContext {
dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
}
- /**
- * createBufferQueue
- * @return
- */
public static <U> BufferQueue<U> createBufferQueue() {
int maxBufferQueueSizeKb =
CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB,
DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
index f0fd784acb..3c167be300 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
@@ -25,6 +25,7 @@ import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import java.util.List;
@@ -37,6 +38,7 @@ import java.util.stream.Collectors;
@Builder
@NoArgsConstructor
@AllArgsConstructor
+@EqualsAndHashCode
public class ClsIdConfig {
private String inlongGroupId;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 34bddaed32..09c8742c04 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -17,18 +17,22 @@
package org.apache.inlong.sort.standalone.sink.cls;
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tencentcloudapi.cls.producer.AsyncProducerClient;
@@ -97,20 +101,24 @@ public class ClsSinkContext extends SinkContext {
}
});
- SortTaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
- if (newSortTaskConfig == null ||
newSortTaskConfig.equals(sortTaskConfig)) {
+ TaskConfig newTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
+ SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
+ && (newSortTaskConfig == null ||
newSortTaskConfig.equals(sortTaskConfig))) {
return;
}
- LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName,
- objectMapper.writeValueAsString(newSortTaskConfig));
- this.sortTaskConfig = newSortTaskConfig;
- ClsNodeConfig requestNodeConfig = (ClsNodeConfig)
sortTaskConfig.getNodeConfig();
+ LOG.info("get new SortTaskConfig:taskName:{}", taskName);
+ ClsNodeConfig requestNodeConfig = (ClsNodeConfig)
newTaskConfig.getNodeConfig();
if (clsNodeConfig == null || requestNodeConfig.getVersion() >
clsNodeConfig.getVersion()) {
this.clsNodeConfig = requestNodeConfig;
}
- this.keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH;
- this.reloadIdParams();
- this.reloadClients();
+ this.taskConfig = newTaskConfig;
+ this.sortTaskConfig = newSortTaskConfig;
+
+ Map<String, ClsIdConfig> fromTaskConfig =
reloadIdParamsFromTaskConfig(taskConfig, clsNodeConfig);
+ Map<String, ClsIdConfig> fromSortTaskConfig =
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
+ idConfigMap = unifiedConfiguration ? fromTaskConfig :
fromSortTaskConfig;
+ this.reloadClients(idConfigMap);
this.reloadHandler();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -134,10 +142,13 @@ public class ClsSinkContext extends SinkContext {
}
}
- private void reloadIdParams() {
- this.idConfigMap = this.sortTaskConfig.getClusters()
+ private Map<String, ClsIdConfig> reloadIdParamsFromTaskConfig(TaskConfig
taskConfig, ClsNodeConfig clsNodeConfig) {
+ if (taskConfig == null) {
+ return new HashMap<>();
+ }
+ return taskConfig.getClusterTagConfigs()
.stream()
- .map(SortClusterConfig::getDataFlowConfigs)
+ .map(ClusterTagConfig::getDataFlowConfigs)
.flatMap(Collection::stream)
.map(dataFlowConfig -> ClsIdConfig.create(dataFlowConfig,
clsNodeConfig))
.collect(Collectors.toMap(
@@ -145,7 +156,25 @@ public class ClsSinkContext extends SinkContext {
v -> v));
}
- private void reloadClients() {
+ private Map<String, ClsIdConfig>
reloadIdParamsFromSortTaskConfig(SortTaskConfig sortTaskConfig)
+ throws JsonProcessingException {
+ if (sortTaskConfig == null) {
+ return new HashMap<>();
+ }
+ List<Map<String, String>> idList = this.sortTaskConfig.getIdParams();
+ Map<String, ClsIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
+ for (Map<String, String> idParam : idList) {
+ String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
+ String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
+ String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ String jsonIdConfig = objectMapper.writeValueAsString(idParam);
+ ClsIdConfig idConfig = objectMapper.readValue(jsonIdConfig,
ClsIdConfig.class);
+ newIdConfigMap.put(uid, idConfig);
+ }
+ return newIdConfigMap;
+ }
+
+ private void reloadClients(Map<String, ClsIdConfig> idConfigMap) {
// get update secretIds
Map<String, ClsIdConfig> updateConfigMap = idConfigMap.values()
.stream()
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
index a6d3dba5b3..f283e5c855 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
-import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.flume.Context;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index e9dcb26a7f..6357dc8330 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -17,8 +17,8 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.node.EsNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
@@ -26,7 +26,7 @@ import
org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
@@ -138,23 +138,23 @@ public class EsSinkContext extends SinkContext {
LOG.info("SortTask:{},dispatchQueue:{},offer:{},take:{},back:{}",
taskName, dispatchQueue.size(), offerCounter.getAndSet(0),
takeCounter.getAndSet(0), backCounter.getAndSet(0));
- SortTaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
- if (this.sortTaskConfig != null &&
this.sortTaskConfig.equals(newSortTaskConfig)) {
+ TaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
+ if (this.taskConfig != null &&
this.taskConfig.equals(newSortTaskConfig)) {
return;
}
LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName,
objectMapper.writeValueAsString(newSortTaskConfig));
- this.sortTaskConfig = newSortTaskConfig;
- EsNodeConfig requestNodeConfig = (EsNodeConfig)
sortTaskConfig.getNodeConfig();
+ this.taskConfig = newSortTaskConfig;
+ EsNodeConfig requestNodeConfig = (EsNodeConfig)
taskConfig.getNodeConfig();
if (esNodeConfig == null || requestNodeConfig.getVersion() >
esNodeConfig.getVersion()) {
this.esNodeConfig = requestNodeConfig;
}
- Map<String, String> properties =
this.sortTaskConfig.getNodeConfig().getProperties();
+ Map<String, String> properties =
this.taskConfig.getNodeConfig().getProperties();
this.sinkContext = new Context(properties != null ? properties :
new HashMap<>());
// change current config
- this.idConfigMap = this.sortTaskConfig.getClusters()
+ this.idConfigMap = this.taskConfig.getClusterTagConfigs()
.stream()
- .map(SortClusterConfig::getDataFlowConfigs)
+ .map(ClusterTagConfig::getDataFlowConfigs)
.flatMap(Collection::stream)
.map(EsIdConfig::create)
.collect(Collectors.toMap(
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index 4e94c67248..ad19c73547 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -17,8 +17,8 @@
package org.apache.inlong.sort.standalone.sink.kafka;
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
@@ -26,7 +26,7 @@ import
org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.commons.lang3.ClassUtils;
@@ -59,24 +59,24 @@ public class KafkaFederationSinkContext extends SinkContext
{
public void reload() {
LOG.info("reload KafkaFederationSinkContext.");
try {
- SortTaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
+ TaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
if (newSortTaskConfig == null) {
LOG.error("newSortTaskConfig is null.");
return;
}
- if (this.sortTaskConfig != null &&
this.sortTaskConfig.equals(newSortTaskConfig)) {
+ if (this.taskConfig != null &&
this.taskConfig.equals(newSortTaskConfig)) {
LOG.info("Same sortTaskConfig, do nothing.");
return;
}
- this.sortTaskConfig = newSortTaskConfig;
+ this.taskConfig = newSortTaskConfig;
KafkaNodeConfig requestNodeConfig = (KafkaNodeConfig)
newSortTaskConfig.getNodeConfig();
if (kafkaNodeConfig == null || requestNodeConfig.getVersion() >
kafkaNodeConfig.getVersion()) {
this.kafkaNodeConfig = requestNodeConfig;
}
- this.idConfigMap = this.sortTaskConfig.getClusters()
+ this.idConfigMap = this.taskConfig.getClusterTagConfigs()
.stream()
- .map(SortClusterConfig::getDataFlowConfigs)
+ .map(ClusterTagConfig::getDataFlowConfigs)
.flatMap(Collection::stream)
.map(KafkaIdConfig::create)
.collect(Collectors.toMap(
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index e5ed8985b2..770f028da0 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -17,8 +17,8 @@
package org.apache.inlong.sort.standalone.sink.pulsar;
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
@@ -26,7 +26,7 @@ import
org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.commons.lang3.ClassUtils;
@@ -53,25 +53,25 @@ public class PulsarFederationSinkContext extends
SinkContext {
public void reload() {
try {
- SortTaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
+ TaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
if (newSortTaskConfig == null) {
LOG.error("newSortTaskConfig is null.");
return;
}
- if (this.sortTaskConfig != null &&
this.sortTaskConfig.equals(newSortTaskConfig)) {
+ if (this.taskConfig != null &&
this.taskConfig.equals(newSortTaskConfig)) {
LOG.info("Same sortTaskConfig, do nothing.");
return;
}
- this.sortTaskConfig = newSortTaskConfig;
+ this.taskConfig = newSortTaskConfig;
PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig)
newSortTaskConfig.getNodeConfig();
if (pulsarNodeConfig == null || requestNodeConfig.getVersion() >
pulsarNodeConfig.getVersion()) {
this.pulsarNodeConfig = requestNodeConfig;
}
- this.idConfigMap = this.sortTaskConfig.getClusters()
+ this.idConfigMap = this.taskConfig.getClusterTagConfigs()
.stream()
- .map(SortClusterConfig::getDataFlowConfigs)
+ .map(ClusterTagConfig::getDataFlowConfigs)
.flatMap(Collection::stream)
.map(PulsarIdConfig::create)
.collect(Collectors.toMap(
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
deleted file mode 100644
index 251a6d56af..0000000000
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.sink.v2;
-
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
-import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
-import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
-import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
-import org.apache.inlong.sort.standalone.utils.BufferQueue;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.slf4j.Logger;
-
-import java.util.Date;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-public class SinkContext {
-
- public static final Logger LOG =
InlongLoggerFactory.getLogger(SinkContext.class);
- public static final String KEY_MAX_THREADS = "maxThreads";
- public static final String KEY_PROCESSINTERVAL = "processInterval";
- public static final String KEY_RELOADINTERVAL = "reloadInterval";
- public static final String KEY_TASK_NAME = "taskName";
- public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB =
"maxBufferQueueSizeKb";
- public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
- protected final String clusterId;
- protected final String taskName;
- protected final String sinkName;
- protected final Context sinkContext;
- protected SortTaskConfig sortTaskConfig;
- protected final Channel channel;
- protected final int maxThreads;
- protected final long processInterval;
- protected final long reloadInterval;
- protected final SortMetricItemSet metricItemSet;
- protected Timer reloadTimer;
-
- public SinkContext(String sinkName, Context context, Channel channel) {
- this.sinkName = sinkName;
- this.sinkContext = context;
- this.channel = channel;
- this.clusterId =
sinkContext.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
- this.taskName = sinkContext.getString(KEY_TASK_NAME);
- this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
- this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL,
100);
- this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
- this.metricItemSet = new SortMetricItemSet(sinkName);
- MetricRegister.register(this.metricItemSet);
- }
-
- public void start() {
- try {
- this.reload();
- this.setReloadTimer();
- } catch (Exception e) {
- LOG.error("failed to start sink context", e);
- }
- }
-
- public void close() {
- try {
- this.reloadTimer.cancel();
- } catch (Exception e) {
- LOG.error("failed to close sink context", e);
- }
- }
-
- protected void setReloadTimer() {
- reloadTimer = new Timer(true);
- TimerTask task = new TimerTask() {
-
- public void run() {
- reload();
- }
- };
- reloadTimer.schedule(task, new Date(System.currentTimeMillis() +
reloadInterval), reloadInterval);
- }
-
- public void reload() {
- try {
- this.sortTaskConfig = SortConfigHolder.getTaskConfig(taskName);
- } catch (Throwable e) {
- LOG.error("failed to stop sink context", e);
- }
- }
-
- public String getClusterId() {
- return clusterId;
- }
-
- public String getTaskName() {
- return taskName;
- }
-
- public String getSinkName() {
- return sinkName;
- }
-
- public Context getSinkContext() {
- return sinkContext;
- }
-
- public SortTaskConfig getSortTaskConfig() {
- return sortTaskConfig;
- }
-
- public Channel getChannel() {
- return channel;
- }
-
- public int getMaxThreads() {
- return maxThreads;
- }
-
- public long getProcessInterval() {
- return processInterval;
- }
-
- public long getReloadInterval() {
- return reloadInterval;
- }
-
- public SortMetricItemSet getMetricItemSet() {
- return metricItemSet;
- }
-
- public static void fillInlongId(ProfileEvent currentRecord, Map<String,
String> dimensions) {
- String inlongGroupId = currentRecord.getInlongGroupId();
- inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" :
inlongGroupId;
- String inlongStreamId = currentRecord.getInlongStreamId();
- inlongStreamId = (StringUtils.isBlank(inlongStreamId)) ? "-" :
inlongStreamId;
- dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, inlongGroupId);
- dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
- }
-
- public static <U> BufferQueue<U> createBufferQueue() {
- int maxBufferQueueSizeKb =
CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB,
- DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
- BufferQueue<U> dispatchQueue = new BufferQueue<>(maxBufferQueueSizeKb);
- return dispatchQueue;
- }
-}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
index f7e2e58cc6..406c0ed0de 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.standalone.v2;
import org.apache.inlong.common.pojo.sort.SortConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.sdk.commons.admin.AdminTask;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
@@ -92,7 +92,7 @@ public class SortCluster {
return;
}
// add new task
- for (SortTaskConfig taskConfig : newConfig.getTasks()) {
+ for (TaskConfig taskConfig : newConfig.getTasks()) {
String newTaskName = taskConfig.getSortTaskName();
if (taskMap.containsKey(newTaskName)) {
continue;
@@ -106,7 +106,7 @@ public class SortCluster {
for (Map.Entry<String, SortTask> entry : taskMap.entrySet()) {
String taskName = entry.getKey();
boolean isFound = false;
- for (SortTaskConfig taskConfig : newConfig.getTasks()) {
+ for (TaskConfig taskConfig : newConfig.getTasks()) {
if (taskName.equals(taskConfig.getSortTaskName())) {
isFound = true;
break;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
index d94ba6ffdf..231f7a41bd 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sort.standalone.v2;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.sort.standalone.PropertiesConfigurationProvider;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.utils.v2.FlumeConfigGenerator;
@@ -48,7 +48,7 @@ public class SortTask {
}
public void start() {
- SortTaskConfig config = SortConfigHolder.getTaskConfig(taskName);
+ TaskConfig config = SortConfigHolder.getTaskConfig(taskName);
if (config == null) {
return;
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
b/inlong-sort-standalone/sort-standalone-source/src/test/java/SortConfig.conf
similarity index 99%
rename from
inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
rename to
inlong-sort-standalone/sort-standalone-source/src/test/java/SortConfig.conf
index 637698dfa7..82b9cfe05a 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/SortConfig.conf
@@ -20,7 +20,7 @@
"tasks": [
{
"sortTaskName": "sid_es_es-rmrv7g7a_v3",
- "clusters": [
+ "clusterTagConfigs": [
{
"clusterTag": "default_cluster",
"mqClusterConfigs": [
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
b/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
index 512d53f179..5f79465039 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
@@ -25,6 +25,7 @@
sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
sortSource.type=org.apache.inlong.sort.standalone.source.readapi.ReadApiSource
sortClusterConfig.type=org.apache.inlong.sort.standalone.config.loader.ClassResourceSortClusterConfigLoader
+sortConfig.type=org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader
indexRequestHandler=org.apache.inlong.sort.standalone.sink.elasticsearch.DefaultEvent2IndexRequestHandler
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortConfig.conf
similarity index 99%
rename from
inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
rename to
inlong-sort-standalone/sort-standalone-source/src/test/resources/SortConfig.conf
index 637698dfa7..82b9cfe05a 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortConfig.conf
@@ -20,7 +20,7 @@
"tasks": [
{
"sortTaskName": "sid_es_es-rmrv7g7a_v3",
- "clusters": [
+ "clusterTagConfigs": [
{
"clusterTag": "default_cluster",
"mqClusterConfigs": [
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
index 512d53f179..5f79465039 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
@@ -25,6 +25,7 @@
sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
sortSource.type=org.apache.inlong.sort.standalone.source.readapi.ReadApiSource
sortClusterConfig.type=org.apache.inlong.sort.standalone.config.loader.ClassResourceSortClusterConfigLoader
+sortConfig.type=org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader
indexRequestHandler=org.apache.inlong.sort.standalone.sink.elasticsearch.DefaultEvent2IndexRequestHandler