This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0113074789 [INLONG-10272][Sort] Unified configuration check utils 
support check latest config (#10273)
0113074789 is described below

commit 0113074789aeb0d0fae0311bd4200f6ab7d7ff80
Author: vernedeng <[email protected]>
AuthorDate: Sun May 26 19:34:51 2024 +0800

    [INLONG-10272][Sort] Unified configuration check utils support check latest 
config (#10273)
---
 .../inlong/common/pojo/sort/SortClusterConfig.java | 52 +++++++++++-----------
 .../apache/inlong/common/pojo/sort/SortConfig.java | 26 ++++++-----
 .../inlong/common/pojo/sort/SortTaskConfig.java    | 51 ++++++++++++---------
 .../common/pojo/sort/dataflow/DataFlowConfig.java  | 12 +++--
 .../pojo/sort/dataflow/dataType/KvConfig.java      |  1 +
 .../common/pojo/sort/mq/MqClusterConfig.java       | 21 ++++++---
 .../apache/inlong/common/util/SortConfigUtil.java  | 48 ++++++++++++++++++--
 .../apache/inlong/sdk/sort/entity/InLongTopic.java |  6 +--
 .../sort/fetcher/tube/TubeSingleTopicFetcher.java  |  2 +-
 .../sdk/sort/impl/QueryConsumeConfigImpl.java      |  4 +-
 .../loader/ClassResourceQueryConsumeConfig.java    |  3 +-
 11 files changed, 151 insertions(+), 75 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
index fc2fe06b5e..90541919eb 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
@@ -41,25 +41,27 @@ public class SortClusterConfig implements Serializable {
     private List<MqClusterConfig> mqClusterConfigs;
     private List<DataFlowConfig> dataFlowConfigs;
 
-    public static List<SortClusterConfig> batchCheckDelete(
-            List<SortClusterConfig> last,
-            List<SortClusterConfig> current) {
-        return SortConfigUtil.batchCheckDeleteRecursive(last, current,
-                SortClusterConfig::getClusterTag, 
SortClusterConfig::checkDelete);
+    public static SortClusterConfig checkDelete(SortClusterConfig last, 
SortClusterConfig current) {
+        return check(last, current, MqClusterConfig::batchCheckLast, 
DataFlowConfig::batchCheckDelete);
     }
 
-    public static List<SortClusterConfig> batchCheckUpdate(
-            List<SortClusterConfig> last,
-            List<SortClusterConfig> current) {
-        return SortConfigUtil.batchCheckUpdateRecursive(last, current,
-                SortClusterConfig::getClusterTag, 
SortClusterConfig::checkUpdate);
+    public static SortClusterConfig checkNew(SortClusterConfig last, 
SortClusterConfig current) {
+        return check(last, current, MqClusterConfig::batchCheckLatest, 
DataFlowConfig::batchCheckNew);
+    }
+
+    public static SortClusterConfig checkUpdate(SortClusterConfig last, 
SortClusterConfig current) {
+        return check(last, current, MqClusterConfig::batchCheckLatest, 
DataFlowConfig::batchCheckUpdate);
+    }
+
+    public static SortClusterConfig checkLatest(SortClusterConfig last, 
SortClusterConfig current) {
+        return check(last, current, MqClusterConfig::batchCheckLatest, 
DataFlowConfig::batchCheckLatest);
     }
 
-    public static List<SortClusterConfig> batchCheckNoUpdate(
+    public static List<SortClusterConfig> batchCheckDelete(
             List<SortClusterConfig> last,
             List<SortClusterConfig> current) {
-        return SortConfigUtil.batchCheckNoUpdateRecursive(last, current,
-                SortClusterConfig::getClusterTag, 
SortClusterConfig::checkNoUpdate);
+        return SortConfigUtil.batchCheckDeleteRecursive(last, current,
+                SortClusterConfig::getClusterTag, 
SortClusterConfig::checkDelete);
     }
 
     public static List<SortClusterConfig> batchCheckNew(
@@ -69,20 +71,18 @@ public class SortClusterConfig implements Serializable {
                 SortClusterConfig::getClusterTag, SortClusterConfig::checkNew);
     }
 
-    public static SortClusterConfig checkDelete(SortClusterConfig last, 
SortClusterConfig current) {
-        return check(last, current, MqClusterConfig::batchCheckDelete, 
DataFlowConfig::batchCheckDelete);
-    }
-
-    public static SortClusterConfig checkUpdate(SortClusterConfig last, 
SortClusterConfig current) {
-        return check(last, current, MqClusterConfig::batchCheckUpdate, 
DataFlowConfig::batchCheckUpdate);
-    }
-
-    public static SortClusterConfig checkNoUpdate(SortClusterConfig last, 
SortClusterConfig current) {
-        return check(last, current, MqClusterConfig::batchCheckNoUpdate, 
DataFlowConfig::batchCheckNoUpdate);
+    public static List<SortClusterConfig> batchCheckUpdate(
+            List<SortClusterConfig> last,
+            List<SortClusterConfig> current) {
+        return SortConfigUtil.batchCheckUpdateRecursive(last, current,
+                SortClusterConfig::getClusterTag, 
SortClusterConfig::checkUpdate);
     }
 
-    public static SortClusterConfig checkNew(SortClusterConfig last, 
SortClusterConfig current) {
-        return check(last, current, MqClusterConfig::batchCheckNew, 
DataFlowConfig::batchCheckNew);
+    public static List<SortClusterConfig> batchCheckLatest(
+            List<SortClusterConfig> last,
+            List<SortClusterConfig> current) {
+        return SortConfigUtil.batchCheckLatestRecursive(last, current,
+                SortClusterConfig::getClusterTag, 
SortClusterConfig::checkLatest);
     }
 
     public static SortClusterConfig check(
@@ -95,7 +95,7 @@ public class SortClusterConfig implements Serializable {
         List<DataFlowConfig> checkDataflows = flowCheckFunction
                 .apply(last.getDataFlowConfigs(), 
current.getDataFlowConfigs());
 
-        if (CollectionUtils.isNotEmpty(checkCluster) && 
CollectionUtils.isNotEmpty(checkDataflows)) {
+        if (CollectionUtils.isEmpty(checkDataflows)) {
             return null;
         }
 
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
index d2289001bb..5f0b6c0b6d 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
@@ -36,28 +36,34 @@ public class SortConfig implements Serializable {
     private String sortClusterName;
     private List<SortTaskConfig> tasks;
 
-    public static SortConfig checkDelete(SortConfig last, SortConfig current) {
+    public static SortConfig checkLatest(SortConfig last, SortConfig current) {
         if (last == null) {
-            return null;
+            return current;
         }
         if (current == null) {
-            return last;
+            return null;
         }
-        return check(last, current, SortTaskConfig::checkDeleteBatch);
+        return SortConfig.builder()
+                .sortClusterName(current.getSortClusterName())
+                .tasks(SortTaskConfig.batchCheckLatest(last.getTasks(), 
current.getTasks()))
+                .build();
     }
 
-    public static SortConfig checkUpdate(SortConfig last, SortConfig current) {
-        if (last == null || current == null) {
+    public static SortConfig checkDelete(SortConfig last, SortConfig current) {
+        if (last == null) {
             return null;
         }
-        return check(last, current, SortTaskConfig::checkUpdateBatch);
+        if (current == null) {
+            return last;
+        }
+        return check(last, current, SortTaskConfig::batchCheckDelete);
     }
 
-    public static SortConfig checkNoUpdate(SortConfig last, SortConfig 
current) {
+    public static SortConfig checkUpdate(SortConfig last, SortConfig current) {
         if (last == null || current == null) {
             return null;
         }
-        return check(last, current, SortTaskConfig::checkNoUpdateBatch);
+        return check(last, current, SortTaskConfig::batchCheckUpdate);
     }
 
     public static SortConfig checkNew(SortConfig last, SortConfig current) {
@@ -67,7 +73,7 @@ public class SortConfig implements Serializable {
         if (current == null) {
             return null;
         }
-        return check(last, current, SortTaskConfig::checkNewBatch);
+        return check(last, current, SortTaskConfig::batchCheckNew);
     }
 
     public static SortConfig check(
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
index 9dcfe837d9..107efcec80 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
@@ -40,49 +40,59 @@ public class SortTaskConfig implements Serializable {
     private List<SortClusterConfig> clusters;
     private NodeConfig nodeConfig;
 
-    public static List<SortTaskConfig> checkDeleteBatch(List<SortTaskConfig> 
last, List<SortTaskConfig> current) {
+    public static List<SortTaskConfig> batchCheckDelete(List<SortTaskConfig> 
last, List<SortTaskConfig> current) {
         return SortConfigUtil.batchCheckDeleteRecursive(last, current,
                 SortTaskConfig::getSortTaskName, SortTaskConfig::checkDelete);
     }
 
-    public static List<SortTaskConfig> checkUpdateBatch(List<SortTaskConfig> 
last, List<SortTaskConfig> current) {
+    public static List<SortTaskConfig> batchCheckUpdate(List<SortTaskConfig> 
last, List<SortTaskConfig> current) {
         return SortConfigUtil.batchCheckUpdateRecursive(last, current,
                 SortTaskConfig::getSortTaskName, SortTaskConfig::checkUpdate);
     }
 
-    public static List<SortTaskConfig> checkNoUpdateBatch(List<SortTaskConfig> 
last, List<SortTaskConfig> current) {
-        return SortConfigUtil.batchCheckNoUpdateRecursive(last, current,
-                SortTaskConfig::getSortTaskName, 
SortTaskConfig::checkNoUpdate);
-    }
-
-    public static List<SortTaskConfig> checkNewBatch(List<SortTaskConfig> 
last, List<SortTaskConfig> current) {
+    public static List<SortTaskConfig> batchCheckNew(List<SortTaskConfig> 
last, List<SortTaskConfig> current) {
         return SortConfigUtil.batchCheckNewRecursive(last, current,
                 SortTaskConfig::getSortTaskName, SortTaskConfig::checkNew);
     }
 
+    public static List<SortTaskConfig> batchCheckLatest(List<SortTaskConfig> 
latest, List<SortTaskConfig> current) {
+        return SortConfigUtil.batchCheckLatestRecursive(latest, current,
+                SortTaskConfig::getSortTaskName, SortTaskConfig::checkLatest);
+    }
+
     public static SortTaskConfig checkDelete(SortTaskConfig last, 
SortTaskConfig current) {
         return check(last, current, SortClusterConfig::batchCheckDelete,
+                (lastNode, currentNode) -> lastNode);
+    }
+
+    public static SortTaskConfig checkUpdate(SortTaskConfig last, 
SortTaskConfig current) {
+        return check(last, current, SortClusterConfig::batchCheckUpdate,
                 (lastNode, currentNode) -> {
                     if (lastNode == null || currentNode == null) {
                         return null;
                     }
-                    return lastNode.getVersion() >= currentNode.getVersion() ? 
lastNode : currentNode;
+                    return lastNode.getVersion() < currentNode.getVersion() ? 
null : currentNode;
                 });
     }
 
-    public static SortTaskConfig checkUpdate(SortTaskConfig last, 
SortTaskConfig current) {
-        return check(last, current, SortClusterConfig::batchCheckUpdate,
-                (lastNode, currentNode) -> lastNode.getVersion() < 
currentNode.getVersion() ? null : currentNode);
-    }
-
-    public static SortTaskConfig checkNoUpdate(SortTaskConfig last, 
SortTaskConfig current) {
-        return check(last, current, SortClusterConfig::batchCheckNoUpdate,
-                (lastNode, currentNode) -> lastNode.getVersion() >= 
currentNode.getVersion() ? lastNode : null);
-    }
-
     public static SortTaskConfig checkNew(SortTaskConfig last, SortTaskConfig 
current) {
         return check(last, current, SortClusterConfig::batchCheckNew,
-                (lastNode, currentNode) -> lastNode.getVersion() >= 
currentNode.getVersion() ? lastNode : currentNode);
+                (lastNode, currentNode) -> {
+                    if (lastNode == null || currentNode == null) {
+                        return null;
+                    }
+                    return lastNode.getVersion() >= currentNode.getVersion() ? 
lastNode : currentNode;
+                });
+    }
+
+    public static SortTaskConfig checkLatest(SortTaskConfig last, 
SortTaskConfig current) {
+        return check(last, current, SortClusterConfig::batchCheckLatest,
+                (lastNode, currentNode) -> {
+                    if (lastNode == null || currentNode == null) {
+                        return null;
+                    }
+                    return lastNode.getVersion() >= currentNode.getVersion() ? 
lastNode : currentNode;
+                });
     }
 
     public static SortTaskConfig check(
@@ -100,6 +110,7 @@ public class SortTaskConfig implements Serializable {
 
         return SortTaskConfig
                 .builder()
+                .sortTaskName(last.getSortTaskName())
                 .clusters(checkCluster)
                 .nodeConfig(checkNode)
                 .build();
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
index 321bf1239e..bfbe6302e4 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.common.pojo.sort.dataflow;
 
 import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
-import org.apache.inlong.common.util.ListUtil;
 import org.apache.inlong.common.util.SortConfigUtil;
 
 import lombok.AllArgsConstructor;
@@ -43,10 +42,10 @@ public class DataFlowConfig implements Serializable {
     private String inlongStreamId;
     private SourceConfig sourceConfig;
     private SinkConfig sinkConfig;
-    private Map<String, String> properties;
+    private Map<String, Object> properties;
 
     public static List<DataFlowConfig> batchCheckDelete(List<DataFlowConfig> 
last, List<DataFlowConfig> current) {
-        return ListUtil.subtract(last, current, DataFlowConfig::getDataflowId);
+        return SortConfigUtil.checkDelete(last, current, 
DataFlowConfig::getDataflowId);
     }
 
     public static List<DataFlowConfig> batchCheckUpdate(List<DataFlowConfig> 
last, List<DataFlowConfig> current) {
@@ -58,7 +57,12 @@ public class DataFlowConfig implements Serializable {
     }
 
     public static List<DataFlowConfig> batchCheckNew(List<DataFlowConfig> 
last, List<DataFlowConfig> current) {
-        return ListUtil.subtract(current, last, DataFlowConfig::getDataflowId);
+        return SortConfigUtil.checkNew(last, current, 
DataFlowConfig::getDataflowId);
+    }
+
+    public static List<DataFlowConfig> batchCheckLatest(List<DataFlowConfig> 
last, List<DataFlowConfig> current) {
+        return SortConfigUtil.checkLatest(last, current,
+                DataFlowConfig::getDataflowId, DataFlowConfig::getVersion);
     }
 
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/KvConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/KvConfig.java
index d595d6681a..06f52ca933 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/KvConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/KvConfig.java
@@ -25,4 +25,5 @@ public class KvConfig implements DataTypeConfig {
     private char entrySplitter;
     private char kvSplitter;
     private Character escapeChar;
+    private Character lineSeparator;
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
index f23ce3c6af..599d2b661f 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.common.pojo.sort.mq;
 
 import org.apache.inlong.common.constant.MQType;
-import org.apache.inlong.common.util.ListUtil;
 import org.apache.inlong.common.util.SortConfigUtil;
 
 import lombok.Data;
@@ -40,20 +39,30 @@ public abstract class MqClusterConfig implements 
Serializable {
     private String clusterName;
 
     public static List<MqClusterConfig> batchCheckDelete(List<MqClusterConfig> 
last, List<MqClusterConfig> current) {
-        return ListUtil.subtract(last, current, 
MqClusterConfig::getClusterName);
+        return SortConfigUtil.checkDelete(last, current, 
MqClusterConfig::getClusterName);
     }
 
     public static List<MqClusterConfig> batchCheckUpdate(List<MqClusterConfig> 
last, List<MqClusterConfig> current) {
-        return SortConfigUtil.checkUpdate(last, current, 
MqClusterConfig::getClusterName, MqClusterConfig::getVersion);
+        return SortConfigUtil.checkUpdate(last, current,
+                MqClusterConfig::getClusterName, MqClusterConfig::getVersion);
     }
 
     public static List<MqClusterConfig> 
batchCheckNoUpdate(List<MqClusterConfig> last, List<MqClusterConfig> current) {
-        return SortConfigUtil.checkNoUpdate(last, current, 
MqClusterConfig::getClusterName,
-                MqClusterConfig::getVersion);
+        return SortConfigUtil.checkNoUpdate(last, current,
+                MqClusterConfig::getClusterName, MqClusterConfig::getVersion);
     }
 
     public static List<MqClusterConfig> batchCheckNew(List<MqClusterConfig> 
last, List<MqClusterConfig> current) {
-        return ListUtil.subtract(current, last, 
MqClusterConfig::getClusterName);
+        return SortConfigUtil.checkNew(last, current, 
MqClusterConfig::getClusterName);
+    }
+
+    public static List<MqClusterConfig> batchCheckLatest(List<MqClusterConfig> 
last, List<MqClusterConfig> current) {
+        return SortConfigUtil.checkLatest(last, current,
+                MqClusterConfig::getClusterName, MqClusterConfig::getVersion);
+    }
+
+    public static List<MqClusterConfig> batchCheckLast(List<MqClusterConfig> 
last, List<MqClusterConfig> current) {
+        return last;
     }
 
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/util/SortConfigUtil.java 
b/inlong-common/src/main/java/org/apache/inlong/common/util/SortConfigUtil.java
index 79098bf24d..9e99a17df5 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/util/SortConfigUtil.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/util/SortConfigUtil.java
@@ -19,6 +19,7 @@ package org.apache.inlong.common.util;
 
 import org.apache.commons.collections.CollectionUtils;
 
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -26,6 +27,7 @@ import java.util.Objects;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class SortConfigUtil {
 
@@ -87,6 +89,30 @@ public class SortConfigUtil {
                 .collect(Collectors.toList());
     }
 
+    /**
+     * Check and return list of elements which are the latest version by 
compare the version
+     *
+     * @param last Last elements list
+     * @param current Current elements list
+     * @param keyExtractor Map key extract function
+     * @param versionExtractor Compare key extractor
+     * @return List of elements which are the latest ones
+     * @param <T> Element type
+     * @param <R> Map key type
+     * @param <N> Compare key type
+     */
+    public static <T, R, N extends Comparable<? super N>> List<T> checkLatest(
+            List<T> last, List<T> current,
+            Function<T, R> keyExtractor,
+            Function<T, N> versionExtractor) {
+
+        return Stream.of(checkUpdate(last, current, keyExtractor, 
versionExtractor),
+                checkNoUpdate(last, current, keyExtractor, versionExtractor),
+                checkNew(last, current, keyExtractor))
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+    }
+
     /**
      * Check and return list of elements which have not been updated by 
compare the specified key
      *
@@ -215,12 +241,28 @@ public class SortConfigUtil {
         return ListUtil.union(newInner, newByKey);
     }
 
+    public static <T, R, N extends Comparable<? super N>> List<T> 
batchCheckLatestRecursive(
+            List<T> last, List<T> current,
+            Function<T, R> keyExtractor,
+            BiFunction<T, T, T> singleCheckLatestFunction) {
+        if (CollectionUtils.isEmpty(last)) {
+            return current;
+        }
+        if (CollectionUtils.isEmpty(current)) {
+            return null;
+        }
+
+        List<T> newByKey = checkNew(current, last, keyExtractor);
+        List<T> latestInner = batchCheckRecursive(last, current, keyExtractor, 
singleCheckLatestFunction);
+        return ListUtil.union(latestInner, newByKey);
+    }
+
     /**
      * Batch check and return a list of elements base on a specified check 
function
      * @param last Elements of last check
      * @param current Elements of current
      * @param keyExtractor Key extractor of elements
-     * @param innerCheckFunction The single element check function
+     * @param singleCheckFunction The single element check function
      * @return A list of elements
      * @param <T> Element type
      * @param <R> Key type
@@ -228,7 +270,7 @@ public class SortConfigUtil {
     private static <T, R> List<T> batchCheckRecursive(
             List<T> last, List<T> current,
             Function<T, R> keyExtractor,
-            BiFunction<T, T, T> innerCheckFunction) {
+            BiFunction<T, T, T> singleCheckFunction) {
 
         List<R> intersectionKey = ListUtil.intersectionKey(last, current, 
keyExtractor);
         if (CollectionUtils.isEmpty(intersectionKey)) {
@@ -238,7 +280,7 @@ public class SortConfigUtil {
         Map<R, T> lastMap = ListUtil.toMap(last, keyExtractor);
         Map<R, T> currentMap = ListUtil.toMap(current, keyExtractor);
         return intersectionKey.stream()
-                .map(tag -> innerCheckFunction.apply(lastMap.get(tag), 
currentMap.get(tag)))
+                .map(tag -> singleCheckFunction.apply(lastMap.get(tag), 
currentMap.get(tag)))
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
     }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
index a2a8fbb97d..c90a710d07 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
@@ -27,7 +27,7 @@ public class InLongTopic {
     private int partitionId;
     // pulsar,kafka,tube
     private String topicType;
-    private Map<String, String> properties;
+    private Map<String, Object> properties;
 
     public String getTopic() {
         return topic;
@@ -61,11 +61,11 @@ public class InLongTopic {
         this.topicType = topicType;
     }
 
-    public Map<String, String> getProperties() {
+    public Map<String, Object> getProperties() {
         return properties;
     }
 
-    public void setProperties(Map<String, String> properties) {
+    public void setProperties(Map<String, Object> properties) {
         this.properties = properties;
     }
 
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
index 294a99ef04..d5792d849b 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
@@ -78,7 +78,7 @@ public class TubeSingleTopicFetcher extends 
SingleTopicFetcher {
                 TreeSet<String> filters = null;
                 if (topic.getProperties() != null && 
topic.getProperties().containsKey(
                         SysConstants.TUBE_TOPIC_FILTER_KEY)) {
-                    String filterStr = 
topic.getProperties().get(SysConstants.TUBE_TOPIC_FILTER_KEY);
+                    String filterStr = 
topic.getProperties().get(SysConstants.TUBE_TOPIC_FILTER_KEY).toString();
                     String[] filterArray = filterStr.split(" ");
                     filters = new TreeSet<>(Arrays.asList(filterArray));
                 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
index 32c74013ba..333080a8e5 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -181,7 +182,8 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
                 topic.setInLongCluster(cacheZoneCluster);
                 topic.setTopic(topicInfo.getTopic());
                 topic.setTopicType(cacheZone.getZoneType());
-                topic.setProperties(topicInfo.getTopicProperties());
+                Map<String, Object> properties = new 
HashMap<>(topicInfo.getTopicProperties());
+                topic.setProperties(properties);
                 newGroupTopics.add(topic);
             }
         }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
index f3269e0a37..3ccb7d3141 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
@@ -77,7 +77,8 @@ public class ClassResourceQueryConsumeConfig implements 
QueryConsumeConfig {
                     topic.setInLongCluster(cacheZoneCluster);
                     topic.setTopic(topicInfo.getTopic());
                     topic.setTopicType(cacheZone.getZoneType());
-                    topic.setProperties(topicInfo.getTopicProperties());
+                    Map<String, Object> properties = new 
HashMap<>(topicInfo.getTopicProperties());
+                    topic.setProperties(properties);
                     topics.add(topic);
                 }
             }

Reply via email to