This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0113074789 [INLONG-10272][Sort] Unified configuration check utils
support check latest config (#10273)
0113074789 is described below
commit 0113074789aeb0d0fae0311bd4200f6ab7d7ff80
Author: vernedeng <[email protected]>
AuthorDate: Sun May 26 19:34:51 2024 +0800
[INLONG-10272][Sort] Unified configuration check utils support check latest
config (#10273)
---
.../inlong/common/pojo/sort/SortClusterConfig.java | 52 +++++++++++-----------
.../apache/inlong/common/pojo/sort/SortConfig.java | 26 ++++++-----
.../inlong/common/pojo/sort/SortTaskConfig.java | 51 ++++++++++++---------
.../common/pojo/sort/dataflow/DataFlowConfig.java | 12 +++--
.../pojo/sort/dataflow/dataType/KvConfig.java | 1 +
.../common/pojo/sort/mq/MqClusterConfig.java | 21 ++++++---
.../apache/inlong/common/util/SortConfigUtil.java | 48 ++++++++++++++++++--
.../apache/inlong/sdk/sort/entity/InLongTopic.java | 6 +--
.../sort/fetcher/tube/TubeSingleTopicFetcher.java | 2 +-
.../sdk/sort/impl/QueryConsumeConfigImpl.java | 4 +-
.../loader/ClassResourceQueryConsumeConfig.java | 3 +-
11 files changed, 151 insertions(+), 75 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
index fc2fe06b5e..90541919eb 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
@@ -41,25 +41,27 @@ public class SortClusterConfig implements Serializable {
private List<MqClusterConfig> mqClusterConfigs;
private List<DataFlowConfig> dataFlowConfigs;
- public static List<SortClusterConfig> batchCheckDelete(
- List<SortClusterConfig> last,
- List<SortClusterConfig> current) {
- return SortConfigUtil.batchCheckDeleteRecursive(last, current,
- SortClusterConfig::getClusterTag,
SortClusterConfig::checkDelete);
+ public static SortClusterConfig checkDelete(SortClusterConfig last,
SortClusterConfig current) {
+ return check(last, current, MqClusterConfig::batchCheckLast,
DataFlowConfig::batchCheckDelete);
}
- public static List<SortClusterConfig> batchCheckUpdate(
- List<SortClusterConfig> last,
- List<SortClusterConfig> current) {
- return SortConfigUtil.batchCheckUpdateRecursive(last, current,
- SortClusterConfig::getClusterTag,
SortClusterConfig::checkUpdate);
+ public static SortClusterConfig checkNew(SortClusterConfig last,
SortClusterConfig current) {
+ return check(last, current, MqClusterConfig::batchCheckLatest,
DataFlowConfig::batchCheckNew);
+ }
+
+ public static SortClusterConfig checkUpdate(SortClusterConfig last,
SortClusterConfig current) {
+ return check(last, current, MqClusterConfig::batchCheckLatest,
DataFlowConfig::batchCheckUpdate);
+ }
+
+ public static SortClusterConfig checkLatest(SortClusterConfig last,
SortClusterConfig current) {
+ return check(last, current, MqClusterConfig::batchCheckLatest,
DataFlowConfig::batchCheckLatest);
}
- public static List<SortClusterConfig> batchCheckNoUpdate(
+ public static List<SortClusterConfig> batchCheckDelete(
List<SortClusterConfig> last,
List<SortClusterConfig> current) {
- return SortConfigUtil.batchCheckNoUpdateRecursive(last, current,
- SortClusterConfig::getClusterTag,
SortClusterConfig::checkNoUpdate);
+ return SortConfigUtil.batchCheckDeleteRecursive(last, current,
+ SortClusterConfig::getClusterTag,
SortClusterConfig::checkDelete);
}
public static List<SortClusterConfig> batchCheckNew(
@@ -69,20 +71,18 @@ public class SortClusterConfig implements Serializable {
SortClusterConfig::getClusterTag, SortClusterConfig::checkNew);
}
- public static SortClusterConfig checkDelete(SortClusterConfig last,
SortClusterConfig current) {
- return check(last, current, MqClusterConfig::batchCheckDelete,
DataFlowConfig::batchCheckDelete);
- }
-
- public static SortClusterConfig checkUpdate(SortClusterConfig last,
SortClusterConfig current) {
- return check(last, current, MqClusterConfig::batchCheckUpdate,
DataFlowConfig::batchCheckUpdate);
- }
-
- public static SortClusterConfig checkNoUpdate(SortClusterConfig last,
SortClusterConfig current) {
- return check(last, current, MqClusterConfig::batchCheckNoUpdate,
DataFlowConfig::batchCheckNoUpdate);
+ public static List<SortClusterConfig> batchCheckUpdate(
+ List<SortClusterConfig> last,
+ List<SortClusterConfig> current) {
+ return SortConfigUtil.batchCheckUpdateRecursive(last, current,
+ SortClusterConfig::getClusterTag,
SortClusterConfig::checkUpdate);
}
- public static SortClusterConfig checkNew(SortClusterConfig last,
SortClusterConfig current) {
- return check(last, current, MqClusterConfig::batchCheckNew,
DataFlowConfig::batchCheckNew);
+ public static List<SortClusterConfig> batchCheckLatest(
+ List<SortClusterConfig> last,
+ List<SortClusterConfig> current) {
+ return SortConfigUtil.batchCheckLatestRecursive(last, current,
+ SortClusterConfig::getClusterTag,
SortClusterConfig::checkLatest);
}
public static SortClusterConfig check(
@@ -95,7 +95,7 @@ public class SortClusterConfig implements Serializable {
List<DataFlowConfig> checkDataflows = flowCheckFunction
.apply(last.getDataFlowConfigs(),
current.getDataFlowConfigs());
- if (CollectionUtils.isNotEmpty(checkCluster) &&
CollectionUtils.isNotEmpty(checkDataflows)) {
+ if (CollectionUtils.isEmpty(checkDataflows)) {
return null;
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
index d2289001bb..5f0b6c0b6d 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
@@ -36,28 +36,34 @@ public class SortConfig implements Serializable {
private String sortClusterName;
private List<SortTaskConfig> tasks;
- public static SortConfig checkDelete(SortConfig last, SortConfig current) {
+ public static SortConfig checkLatest(SortConfig last, SortConfig current) {
if (last == null) {
- return null;
+ return current;
}
if (current == null) {
- return last;
+ return null;
}
- return check(last, current, SortTaskConfig::checkDeleteBatch);
+ return SortConfig.builder()
+ .sortClusterName(current.getSortClusterName())
+ .tasks(SortTaskConfig.batchCheckLatest(last.getTasks(),
current.getTasks()))
+ .build();
}
- public static SortConfig checkUpdate(SortConfig last, SortConfig current) {
- if (last == null || current == null) {
+ public static SortConfig checkDelete(SortConfig last, SortConfig current) {
+ if (last == null) {
return null;
}
- return check(last, current, SortTaskConfig::checkUpdateBatch);
+ if (current == null) {
+ return last;
+ }
+ return check(last, current, SortTaskConfig::batchCheckDelete);
}
- public static SortConfig checkNoUpdate(SortConfig last, SortConfig
current) {
+ public static SortConfig checkUpdate(SortConfig last, SortConfig current) {
if (last == null || current == null) {
return null;
}
- return check(last, current, SortTaskConfig::checkNoUpdateBatch);
+ return check(last, current, SortTaskConfig::batchCheckUpdate);
}
public static SortConfig checkNew(SortConfig last, SortConfig current) {
@@ -67,7 +73,7 @@ public class SortConfig implements Serializable {
if (current == null) {
return null;
}
- return check(last, current, SortTaskConfig::checkNewBatch);
+ return check(last, current, SortTaskConfig::batchCheckNew);
}
public static SortConfig check(
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
index 9dcfe837d9..107efcec80 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
@@ -40,49 +40,59 @@ public class SortTaskConfig implements Serializable {
private List<SortClusterConfig> clusters;
private NodeConfig nodeConfig;
- public static List<SortTaskConfig> checkDeleteBatch(List<SortTaskConfig>
last, List<SortTaskConfig> current) {
+ public static List<SortTaskConfig> batchCheckDelete(List<SortTaskConfig>
last, List<SortTaskConfig> current) {
return SortConfigUtil.batchCheckDeleteRecursive(last, current,
SortTaskConfig::getSortTaskName, SortTaskConfig::checkDelete);
}
- public static List<SortTaskConfig> checkUpdateBatch(List<SortTaskConfig>
last, List<SortTaskConfig> current) {
+ public static List<SortTaskConfig> batchCheckUpdate(List<SortTaskConfig>
last, List<SortTaskConfig> current) {
return SortConfigUtil.batchCheckUpdateRecursive(last, current,
SortTaskConfig::getSortTaskName, SortTaskConfig::checkUpdate);
}
- public static List<SortTaskConfig> checkNoUpdateBatch(List<SortTaskConfig>
last, List<SortTaskConfig> current) {
- return SortConfigUtil.batchCheckNoUpdateRecursive(last, current,
- SortTaskConfig::getSortTaskName,
SortTaskConfig::checkNoUpdate);
- }
-
- public static List<SortTaskConfig> checkNewBatch(List<SortTaskConfig>
last, List<SortTaskConfig> current) {
+ public static List<SortTaskConfig> batchCheckNew(List<SortTaskConfig>
last, List<SortTaskConfig> current) {
return SortConfigUtil.batchCheckNewRecursive(last, current,
SortTaskConfig::getSortTaskName, SortTaskConfig::checkNew);
}
+ public static List<SortTaskConfig> batchCheckLatest(List<SortTaskConfig>
latest, List<SortTaskConfig> current) {
+ return SortConfigUtil.batchCheckLatestRecursive(latest, current,
+ SortTaskConfig::getSortTaskName, SortTaskConfig::checkLatest);
+ }
+
public static SortTaskConfig checkDelete(SortTaskConfig last,
SortTaskConfig current) {
return check(last, current, SortClusterConfig::batchCheckDelete,
+ (lastNode, currentNode) -> lastNode);
+ }
+
+ public static SortTaskConfig checkUpdate(SortTaskConfig last,
SortTaskConfig current) {
+ return check(last, current, SortClusterConfig::batchCheckUpdate,
(lastNode, currentNode) -> {
if (lastNode == null || currentNode == null) {
return null;
}
- return lastNode.getVersion() >= currentNode.getVersion() ?
lastNode : currentNode;
+ return lastNode.getVersion() < currentNode.getVersion() ?
null : currentNode;
});
}
- public static SortTaskConfig checkUpdate(SortTaskConfig last,
SortTaskConfig current) {
- return check(last, current, SortClusterConfig::batchCheckUpdate,
- (lastNode, currentNode) -> lastNode.getVersion() <
currentNode.getVersion() ? null : currentNode);
- }
-
- public static SortTaskConfig checkNoUpdate(SortTaskConfig last,
SortTaskConfig current) {
- return check(last, current, SortClusterConfig::batchCheckNoUpdate,
- (lastNode, currentNode) -> lastNode.getVersion() >=
currentNode.getVersion() ? lastNode : null);
- }
-
public static SortTaskConfig checkNew(SortTaskConfig last, SortTaskConfig
current) {
return check(last, current, SortClusterConfig::batchCheckNew,
- (lastNode, currentNode) -> lastNode.getVersion() >=
currentNode.getVersion() ? lastNode : currentNode);
+ (lastNode, currentNode) -> {
+ if (lastNode == null || currentNode == null) {
+ return null;
+ }
+ return lastNode.getVersion() >= currentNode.getVersion() ?
lastNode : currentNode;
+ });
+ }
+
+ public static SortTaskConfig checkLatest(SortTaskConfig last,
SortTaskConfig current) {
+ return check(last, current, SortClusterConfig::batchCheckLatest,
+ (lastNode, currentNode) -> {
+ if (lastNode == null || currentNode == null) {
+ return null;
+ }
+ return lastNode.getVersion() >= currentNode.getVersion() ?
lastNode : currentNode;
+ });
}
public static SortTaskConfig check(
@@ -100,6 +110,7 @@ public class SortTaskConfig implements Serializable {
return SortTaskConfig
.builder()
+ .sortTaskName(last.getSortTaskName())
.clusters(checkCluster)
.nodeConfig(checkNode)
.build();
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
index 321bf1239e..bfbe6302e4 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
@@ -18,7 +18,6 @@
package org.apache.inlong.common.pojo.sort.dataflow;
import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
-import org.apache.inlong.common.util.ListUtil;
import org.apache.inlong.common.util.SortConfigUtil;
import lombok.AllArgsConstructor;
@@ -43,10 +42,10 @@ public class DataFlowConfig implements Serializable {
private String inlongStreamId;
private SourceConfig sourceConfig;
private SinkConfig sinkConfig;
- private Map<String, String> properties;
+ private Map<String, Object> properties;
public static List<DataFlowConfig> batchCheckDelete(List<DataFlowConfig>
last, List<DataFlowConfig> current) {
- return ListUtil.subtract(last, current, DataFlowConfig::getDataflowId);
+ return SortConfigUtil.checkDelete(last, current,
DataFlowConfig::getDataflowId);
}
public static List<DataFlowConfig> batchCheckUpdate(List<DataFlowConfig>
last, List<DataFlowConfig> current) {
@@ -58,7 +57,12 @@ public class DataFlowConfig implements Serializable {
}
public static List<DataFlowConfig> batchCheckNew(List<DataFlowConfig>
last, List<DataFlowConfig> current) {
- return ListUtil.subtract(current, last, DataFlowConfig::getDataflowId);
+ return SortConfigUtil.checkNew(last, current,
DataFlowConfig::getDataflowId);
+ }
+
+ public static List<DataFlowConfig> batchCheckLatest(List<DataFlowConfig>
last, List<DataFlowConfig> current) {
+ return SortConfigUtil.checkLatest(last, current,
+ DataFlowConfig::getDataflowId, DataFlowConfig::getVersion);
}
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/KvConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/KvConfig.java
index d595d6681a..06f52ca933 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/KvConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/KvConfig.java
@@ -25,4 +25,5 @@ public class KvConfig implements DataTypeConfig {
private char entrySplitter;
private char kvSplitter;
private Character escapeChar;
+ private Character lineSeparator;
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
index f23ce3c6af..599d2b661f 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
@@ -18,7 +18,6 @@
package org.apache.inlong.common.pojo.sort.mq;
import org.apache.inlong.common.constant.MQType;
-import org.apache.inlong.common.util.ListUtil;
import org.apache.inlong.common.util.SortConfigUtil;
import lombok.Data;
@@ -40,20 +39,30 @@ public abstract class MqClusterConfig implements
Serializable {
private String clusterName;
public static List<MqClusterConfig> batchCheckDelete(List<MqClusterConfig>
last, List<MqClusterConfig> current) {
- return ListUtil.subtract(last, current,
MqClusterConfig::getClusterName);
+ return SortConfigUtil.checkDelete(last, current,
MqClusterConfig::getClusterName);
}
public static List<MqClusterConfig> batchCheckUpdate(List<MqClusterConfig>
last, List<MqClusterConfig> current) {
- return SortConfigUtil.checkUpdate(last, current,
MqClusterConfig::getClusterName, MqClusterConfig::getVersion);
+ return SortConfigUtil.checkUpdate(last, current,
+ MqClusterConfig::getClusterName, MqClusterConfig::getVersion);
}
public static List<MqClusterConfig>
batchCheckNoUpdate(List<MqClusterConfig> last, List<MqClusterConfig> current) {
- return SortConfigUtil.checkNoUpdate(last, current,
MqClusterConfig::getClusterName,
- MqClusterConfig::getVersion);
+ return SortConfigUtil.checkNoUpdate(last, current,
+ MqClusterConfig::getClusterName, MqClusterConfig::getVersion);
}
public static List<MqClusterConfig> batchCheckNew(List<MqClusterConfig>
last, List<MqClusterConfig> current) {
- return ListUtil.subtract(current, last,
MqClusterConfig::getClusterName);
+ return SortConfigUtil.checkNew(last, current,
MqClusterConfig::getClusterName);
+ }
+
+ public static List<MqClusterConfig> batchCheckLatest(List<MqClusterConfig>
last, List<MqClusterConfig> current) {
+ return SortConfigUtil.checkLatest(last, current,
+ MqClusterConfig::getClusterName, MqClusterConfig::getVersion);
+ }
+
+ public static List<MqClusterConfig> batchCheckLast(List<MqClusterConfig>
last, List<MqClusterConfig> current) {
+ return last;
}
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/util/SortConfigUtil.java
b/inlong-common/src/main/java/org/apache/inlong/common/util/SortConfigUtil.java
index 79098bf24d..9e99a17df5 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/util/SortConfigUtil.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/util/SortConfigUtil.java
@@ -19,6 +19,7 @@ package org.apache.inlong.common.util;
import org.apache.commons.collections.CollectionUtils;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -26,6 +27,7 @@ import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class SortConfigUtil {
@@ -87,6 +89,30 @@ public class SortConfigUtil {
.collect(Collectors.toList());
}
+ /**
+ * Check and return list of elements which are the latest version by
compare the version
+ *
+ * @param last Last elements list
+ * @param current Current elements list
+ * @param keyExtractor Map key extract function
+ * @param versionExtractor Compare key extractor
+ * @return List of elements which are the latest ones
+ * @param <T> Element type
+ * @param <R> Map key type
+ * @param <N> Compare key type
+ */
+ public static <T, R, N extends Comparable<? super N>> List<T> checkLatest(
+ List<T> last, List<T> current,
+ Function<T, R> keyExtractor,
+ Function<T, N> versionExtractor) {
+
+ return Stream.of(checkUpdate(last, current, keyExtractor,
versionExtractor),
+ checkNoUpdate(last, current, keyExtractor, versionExtractor),
+ checkNew(last, current, keyExtractor))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
/**
* Check and return list of elements which have not been updated by
compare the specified key
*
@@ -215,12 +241,28 @@ public class SortConfigUtil {
return ListUtil.union(newInner, newByKey);
}
+ public static <T, R, N extends Comparable<? super N>> List<T>
batchCheckLatestRecursive(
+ List<T> last, List<T> current,
+ Function<T, R> keyExtractor,
+ BiFunction<T, T, T> singleCheckLatestFunction) {
+ if (CollectionUtils.isEmpty(last)) {
+ return current;
+ }
+ if (CollectionUtils.isEmpty(current)) {
+ return null;
+ }
+
+ List<T> newByKey = checkNew(current, last, keyExtractor);
+ List<T> latestInner = batchCheckRecursive(last, current, keyExtractor,
singleCheckLatestFunction);
+ return ListUtil.union(latestInner, newByKey);
+ }
+
/**
* Batch check and return a list of elements base on a specified check
function
* @param last Elements of last check
* @param current Elements of current
* @param keyExtractor Key extractor of elements
- * @param innerCheckFunction The single element check function
+ * @param singleCheckFunction The single element check function
* @return A list of elements
* @param <T> Element type
* @param <R> Key type
@@ -228,7 +270,7 @@ public class SortConfigUtil {
private static <T, R> List<T> batchCheckRecursive(
List<T> last, List<T> current,
Function<T, R> keyExtractor,
- BiFunction<T, T, T> innerCheckFunction) {
+ BiFunction<T, T, T> singleCheckFunction) {
List<R> intersectionKey = ListUtil.intersectionKey(last, current,
keyExtractor);
if (CollectionUtils.isEmpty(intersectionKey)) {
@@ -238,7 +280,7 @@ public class SortConfigUtil {
Map<R, T> lastMap = ListUtil.toMap(last, keyExtractor);
Map<R, T> currentMap = ListUtil.toMap(current, keyExtractor);
return intersectionKey.stream()
- .map(tag -> innerCheckFunction.apply(lastMap.get(tag),
currentMap.get(tag)))
+ .map(tag -> singleCheckFunction.apply(lastMap.get(tag),
currentMap.get(tag)))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
index a2a8fbb97d..c90a710d07 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
@@ -27,7 +27,7 @@ public class InLongTopic {
private int partitionId;
// pulsar,kafka,tube
private String topicType;
- private Map<String, String> properties;
+ private Map<String, Object> properties;
public String getTopic() {
return topic;
@@ -61,11 +61,11 @@ public class InLongTopic {
this.topicType = topicType;
}
- public Map<String, String> getProperties() {
+ public Map<String, Object> getProperties() {
return properties;
}
- public void setProperties(Map<String, String> properties) {
+ public void setProperties(Map<String, Object> properties) {
this.properties = properties;
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
index 294a99ef04..d5792d849b 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
@@ -78,7 +78,7 @@ public class TubeSingleTopicFetcher extends
SingleTopicFetcher {
TreeSet<String> filters = null;
if (topic.getProperties() != null &&
topic.getProperties().containsKey(
SysConstants.TUBE_TOPIC_FILTER_KEY)) {
- String filterStr =
topic.getProperties().get(SysConstants.TUBE_TOPIC_FILTER_KEY);
+ String filterStr =
topic.getProperties().get(SysConstants.TUBE_TOPIC_FILTER_KEY).toString();
String[] filterArray = filterStr.split(" ");
filters = new TreeSet<>(Arrays.asList(filterArray));
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
index 32c74013ba..333080a8e5 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -181,7 +182,8 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
topic.setInLongCluster(cacheZoneCluster);
topic.setTopic(topicInfo.getTopic());
topic.setTopicType(cacheZone.getZoneType());
- topic.setProperties(topicInfo.getTopicProperties());
+ Map<String, Object> properties = new
HashMap<>(topicInfo.getTopicProperties());
+ topic.setProperties(properties);
newGroupTopics.add(topic);
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
index f3269e0a37..3ccb7d3141 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
@@ -77,7 +77,8 @@ public class ClassResourceQueryConsumeConfig implements
QueryConsumeConfig {
topic.setInLongCluster(cacheZoneCluster);
topic.setTopic(topicInfo.getTopic());
topic.setTopicType(cacheZone.getZoneType());
- topic.setProperties(topicInfo.getTopicProperties());
+ Map<String, Object> properties = new
HashMap<>(topicInfo.getTopicProperties());
+ topic.setProperties(properties);
topics.add(topic);
}
}