This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a5a2a7014d [INLONG-11939][Sort] Support that TaskConfig merge issue
preventing configuration changes from taking effect (#11941)
a5a2a7014d is described below
commit a5a2a7014dd0de11b208f8d2a628228559626aea
Author: ChunLiang Lu <[email protected]>
AuthorDate: Wed Jul 23 14:05:13 2025 +0800
[INLONG-11939][Sort] Support that TaskConfig merge issue preventing
configuration changes from taking effect (#11941)
---
.../inlong/sort/standalone/sink/SinkContext.java | 18 ++++++++++++++++--
.../sort/standalone/sink/cls/ClsSinkContext.java | 9 +++++----
.../standalone/sink/elasticsearch/EsSinkContext.java | 9 ++++-----
.../sink/http/DefaultEvent2HttpRequestHandler.java | 3 ++-
.../sort/standalone/sink/http/HttpSinkContext.java | 9 +++++----
.../sink/kafka/KafkaFederationSinkContext.java | 4 +++-
.../sink/pulsar/PulsarFederationSinkContext.java | 4 +++-
7 files changed, 38 insertions(+), 18 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index f77b2a7a50..ba898c0f1d 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -40,6 +40,7 @@ import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -64,9 +65,12 @@ public class SinkContext {
protected final String taskName;
protected final String sinkName;
protected final Context sinkContext;
+ protected Gson gson = new Gson();
protected TaskConfig taskConfig;
+ protected String taskConfigJson;
@Deprecated
protected SortTaskConfig sortTaskConfig;
+ protected String sortTaskConfigJson;
protected final Channel channel;
protected final int maxThreads;
protected final long processInterval;
@@ -117,15 +121,25 @@ public class SinkContext {
reloadTimer.schedule(task, new Date(System.currentTimeMillis() +
reloadInterval), reloadInterval);
}
+ @SuppressWarnings("deprecation")
public void reload() {
try {
- this.sortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
- this.taskConfig = SortConfigHolder.getTaskConfig(taskName);
+ TaskConfig newTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
+ SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ this.replaceConfig(newTaskConfig, newSortTaskConfig);
} catch (Throwable e) {
LOG.error("failed to stop sink context", e);
}
}
+ @SuppressWarnings("deprecation")
+ protected void replaceConfig(TaskConfig newTaskConfig, SortTaskConfig
newSortTaskConfig) {
+ this.taskConfig = newTaskConfig;
+ this.taskConfigJson = gson.toJson(newTaskConfig);
+ this.sortTaskConfig = newSortTaskConfig;
+ this.sortTaskConfigJson = gson.toJson(newSortTaskConfig);
+ }
+
public String getClusterId() {
return clusterId;
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 12ce29fbe5..c63a7a86af 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -51,6 +51,7 @@ import
com.tencentcloudapi.cls.producer.errors.ProducerException;
import com.tencentcloudapi.cls.producer.util.NetworkUtils;
import lombok.Getter;
import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.slf4j.Logger;
@@ -118,8 +119,9 @@ public class ClsSinkContext extends SinkContext {
TaskConfig newTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
- if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
- && (newSortTaskConfig == null ||
newSortTaskConfig.equals(sortTaskConfig))) {
+ if ((newTaskConfig == null ||
StringUtils.equals(this.taskConfigJson, gson.toJson(newTaskConfig)))
+ && (newSortTaskConfig == null
+ || StringUtils.equals(this.sortTaskConfigJson,
gson.toJson(newSortTaskConfig)))) {
return;
}
LOG.info("get new SortTaskConfig:taskName:{}", taskName);
@@ -130,8 +132,7 @@ public class ClsSinkContext extends SinkContext {
}
}
- this.taskConfig = newTaskConfig;
- this.sortTaskConfig = newSortTaskConfig;
+ this.replaceConfig(newTaskConfig, newSortTaskConfig);
Map<String, ClsIdConfig> fromTaskConfig =
reloadIdParamsFromTaskConfig(taskConfig, clsNodeConfig);
Map<String, ClsIdConfig> fromSortTaskConfig =
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 0a07ff3c31..0ec73c2976 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -150,8 +150,9 @@ public class EsSinkContext extends SinkContext {
takeCounter.getAndSet(0), backCounter.getAndSet(0));
TaskConfig newTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
- if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
- && (newSortTaskConfig == null ||
newSortTaskConfig.equals(sortTaskConfig))) {
+ if ((newTaskConfig == null ||
StringUtils.equals(this.taskConfigJson, gson.toJson(newTaskConfig)))
+ && (newSortTaskConfig == null
+ || StringUtils.equals(this.sortTaskConfigJson,
gson.toJson(newSortTaskConfig)))) {
return;
}
LOG.info("get new SortTaskConfig:taskName:{}", taskName);
@@ -163,9 +164,7 @@ public class EsSinkContext extends SinkContext {
}
}
- this.taskConfig = newTaskConfig;
- this.sortTaskConfig = newSortTaskConfig;
-
+ this.replaceConfig(newTaskConfig, newSortTaskConfig);
// change current config
Map<String, EsIdConfig> fromTaskConfig =
reloadIdParamsFromTaskConfig(taskConfig);
Map<String, TransformProcessor<String, Map<String, Object>>>
transformProcessor =
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
index c4b004f261..6d20db9a4b 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
@@ -173,7 +173,8 @@ public class DefaultEvent2HttpRequestHandler implements
IEvent2HttpRequestHandle
List<Map<String, Object>> results = new ArrayList<>();
for (Map<String, String> fieldMap : fieldMaps) {
Map<String, Object> result = new ConcurrentHashMap<>();
- result.putAll(fieldMap);
+ results.add(result);
+ fieldMap.forEach((k, v) -> result.put(k, String.valueOf(v)));
if (!result.containsKey(KEY_FTIME)) {
result.put(KEY_FTIME, ftime);
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
index 1e9fbf16bf..7408181650 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
@@ -49,6 +49,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import lombok.Getter;
import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.slf4j.Logger;
@@ -125,8 +126,9 @@ public class HttpSinkContext extends SinkContext {
takeCounter.getAndSet(0), backCounter.getAndSet(0));
TaskConfig newTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
- if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
- && (newSortTaskConfig == null ||
newSortTaskConfig.equals(sortTaskConfig))) {
+ if ((newTaskConfig == null ||
StringUtils.equals(this.taskConfigJson, gson.toJson(newTaskConfig)))
+ && (newSortTaskConfig == null
+ || StringUtils.equals(this.sortTaskConfigJson,
gson.toJson(newSortTaskConfig)))) {
return;
}
LOG.info("get new SortTaskConfig:taskName:{}", taskName);
@@ -138,8 +140,7 @@ public class HttpSinkContext extends SinkContext {
}
}
- this.taskConfig = newTaskConfig;
- this.sortTaskConfig = newSortTaskConfig;
+ this.replaceConfig(newTaskConfig, newSortTaskConfig);
// change current config
Map<String, HttpIdConfig> fromTaskConfig =
reloadIdParamsFromTaskConfig(taskConfig);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index 8fe9dc5833..1405086f50 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -89,7 +89,9 @@ public class KafkaFederationSinkContext extends SinkContext {
CacheClusterConfig clusterConfig = new CacheClusterConfig();
clusterConfig.setClusterName(this.taskName);
- clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+ if (this.sortTaskConfig != null) {
+ clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+ }
this.cacheClusterConfig = clusterConfig;
Map<String, KafkaIdConfig> fromTaskConfig =
fromTaskConfig(taskConfig);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index 74284e3210..d17139335e 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -83,7 +83,9 @@ public class PulsarFederationSinkContext extends SinkContext {
CacheClusterConfig clusterConfig = new CacheClusterConfig();
clusterConfig.setClusterName(this.taskName);
- clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+ if (this.sortTaskConfig != null) {
+ clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+ }
this.cacheClusterConfig = clusterConfig;
Map<String, PulsarIdConfig> fromTaskConfig =
fromTaskConfig(taskConfig);