This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a5a2a7014d [INLONG-11939][Sort] Support that TaskConfig merge issue 
preventing configuration changes from taking effect (#11941)
a5a2a7014d is described below

commit a5a2a7014dd0de11b208f8d2a628228559626aea
Author: ChunLiang Lu <[email protected]>
AuthorDate: Wed Jul 23 14:05:13 2025 +0800

    [INLONG-11939][Sort] Support that TaskConfig merge issue preventing 
configuration changes from taking effect (#11941)
---
 .../inlong/sort/standalone/sink/SinkContext.java       | 18 ++++++++++++++++--
 .../sort/standalone/sink/cls/ClsSinkContext.java       |  9 +++++----
 .../standalone/sink/elasticsearch/EsSinkContext.java   |  9 ++++-----
 .../sink/http/DefaultEvent2HttpRequestHandler.java     |  3 ++-
 .../sort/standalone/sink/http/HttpSinkContext.java     |  9 +++++----
 .../sink/kafka/KafkaFederationSinkContext.java         |  4 +++-
 .../sink/pulsar/PulsarFederationSinkContext.java       |  4 +++-
 7 files changed, 38 insertions(+), 18 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index f77b2a7a50..ba898c0f1d 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -40,6 +40,7 @@ import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -64,9 +65,12 @@ public class SinkContext {
     protected final String taskName;
     protected final String sinkName;
     protected final Context sinkContext;
+    protected Gson gson = new Gson();
     protected TaskConfig taskConfig;
+    protected String taskConfigJson;
     @Deprecated
     protected SortTaskConfig sortTaskConfig;
+    protected String sortTaskConfigJson;
     protected final Channel channel;
     protected final int maxThreads;
     protected final long processInterval;
@@ -117,15 +121,25 @@ public class SinkContext {
         reloadTimer.schedule(task, new Date(System.currentTimeMillis() + 
reloadInterval), reloadInterval);
     }
 
+    @SuppressWarnings("deprecation")
     public void reload() {
         try {
-            this.sortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
-            this.taskConfig = SortConfigHolder.getTaskConfig(taskName);
+            TaskConfig newTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
+            SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            this.replaceConfig(newTaskConfig, newSortTaskConfig);
         } catch (Throwable e) {
             LOG.error("failed to stop sink context", e);
         }
     }
 
+    @SuppressWarnings("deprecation")
+    protected void replaceConfig(TaskConfig newTaskConfig, SortTaskConfig 
newSortTaskConfig) {
+        this.taskConfig = newTaskConfig;
+        this.taskConfigJson = gson.toJson(newTaskConfig);
+        this.sortTaskConfig = newSortTaskConfig;
+        this.sortTaskConfigJson = gson.toJson(newSortTaskConfig);
+    }
+
     public String getClusterId() {
         return clusterId;
     }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 12ce29fbe5..c63a7a86af 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -51,6 +51,7 @@ import 
com.tencentcloudapi.cls.producer.errors.ProducerException;
 import com.tencentcloudapi.cls.producer.util.NetworkUtils;
 import lombok.Getter;
 import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.slf4j.Logger;
@@ -118,8 +119,9 @@ public class ClsSinkContext extends SinkContext {
 
             TaskConfig newTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
             SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
-            if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
-                    && (newSortTaskConfig == null || 
newSortTaskConfig.equals(sortTaskConfig))) {
+            if ((newTaskConfig == null || 
StringUtils.equals(this.taskConfigJson, gson.toJson(newTaskConfig)))
+                    && (newSortTaskConfig == null
+                            || StringUtils.equals(this.sortTaskConfigJson, 
gson.toJson(newSortTaskConfig)))) {
                 return;
             }
             LOG.info("get new SortTaskConfig:taskName:{}", taskName);
@@ -130,8 +132,7 @@ public class ClsSinkContext extends SinkContext {
                 }
             }
 
-            this.taskConfig = newTaskConfig;
-            this.sortTaskConfig = newSortTaskConfig;
+            this.replaceConfig(newTaskConfig, newSortTaskConfig);
 
             Map<String, ClsIdConfig> fromTaskConfig = 
reloadIdParamsFromTaskConfig(taskConfig, clsNodeConfig);
             Map<String, ClsIdConfig> fromSortTaskConfig = 
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 0a07ff3c31..0ec73c2976 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -150,8 +150,9 @@ public class EsSinkContext extends SinkContext {
                     takeCounter.getAndSet(0), backCounter.getAndSet(0));
             TaskConfig newTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
             SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
-            if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
-                    && (newSortTaskConfig == null || 
newSortTaskConfig.equals(sortTaskConfig))) {
+            if ((newTaskConfig == null || 
StringUtils.equals(this.taskConfigJson, gson.toJson(newTaskConfig)))
+                    && (newSortTaskConfig == null
+                            || StringUtils.equals(this.sortTaskConfigJson, 
gson.toJson(newSortTaskConfig)))) {
                 return;
             }
             LOG.info("get new SortTaskConfig:taskName:{}", taskName);
@@ -163,9 +164,7 @@ public class EsSinkContext extends SinkContext {
                 }
             }
 
-            this.taskConfig = newTaskConfig;
-            this.sortTaskConfig = newSortTaskConfig;
-
+            this.replaceConfig(newTaskConfig, newSortTaskConfig);
             // change current config
             Map<String, EsIdConfig> fromTaskConfig = 
reloadIdParamsFromTaskConfig(taskConfig);
             Map<String, TransformProcessor<String, Map<String, Object>>> 
transformProcessor =
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
index c4b004f261..6d20db9a4b 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
@@ -173,7 +173,8 @@ public class DefaultEvent2HttpRequestHandler implements 
IEvent2HttpRequestHandle
         List<Map<String, Object>> results = new ArrayList<>();
         for (Map<String, String> fieldMap : fieldMaps) {
             Map<String, Object> result = new ConcurrentHashMap<>();
-            result.putAll(fieldMap);
+            results.add(result);
+            fieldMap.forEach((k, v) -> result.put(k, String.valueOf(v)));
             if (!result.containsKey(KEY_FTIME)) {
                 result.put(KEY_FTIME, ftime);
             }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
index 1e9fbf16bf..7408181650 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
@@ -49,6 +49,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import lombok.Getter;
 import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.slf4j.Logger;
@@ -125,8 +126,9 @@ public class HttpSinkContext extends SinkContext {
                     takeCounter.getAndSet(0), backCounter.getAndSet(0));
             TaskConfig newTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
             SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
-            if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
-                    && (newSortTaskConfig == null || 
newSortTaskConfig.equals(sortTaskConfig))) {
+            if ((newTaskConfig == null || 
StringUtils.equals(this.taskConfigJson, gson.toJson(newTaskConfig)))
+                    && (newSortTaskConfig == null
+                            || StringUtils.equals(this.sortTaskConfigJson, 
gson.toJson(newSortTaskConfig)))) {
                 return;
             }
             LOG.info("get new SortTaskConfig:taskName:{}", taskName);
@@ -138,8 +140,7 @@ public class HttpSinkContext extends SinkContext {
                 }
             }
 
-            this.taskConfig = newTaskConfig;
-            this.sortTaskConfig = newSortTaskConfig;
+            this.replaceConfig(newTaskConfig, newSortTaskConfig);
 
             // change current config
             Map<String, HttpIdConfig> fromTaskConfig = 
reloadIdParamsFromTaskConfig(taskConfig);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index 8fe9dc5833..1405086f50 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -89,7 +89,9 @@ public class KafkaFederationSinkContext extends SinkContext {
 
             CacheClusterConfig clusterConfig = new CacheClusterConfig();
             clusterConfig.setClusterName(this.taskName);
-            clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+            if (this.sortTaskConfig != null) {
+                clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+            }
             this.cacheClusterConfig = clusterConfig;
 
             Map<String, KafkaIdConfig> fromTaskConfig = 
fromTaskConfig(taskConfig);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index 74284e3210..d17139335e 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -83,7 +83,9 @@ public class PulsarFederationSinkContext extends SinkContext {
 
             CacheClusterConfig clusterConfig = new CacheClusterConfig();
             clusterConfig.setClusterName(this.taskName);
-            clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+            if (this.sortTaskConfig != null) {
+                clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+            }
             this.cacheClusterConfig = clusterConfig;
 
             Map<String, PulsarIdConfig> fromTaskConfig = 
fromTaskConfig(taskConfig);

Reply via email to