This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 88595f8dbc [INLONG-10555][Sort] SortStandalone support report the
difference between two configuration (#10556)
88595f8dbc is described below
commit 88595f8dbcacdb79b878f366c506b42fde2d6244
Author: vernedeng <[email protected]>
AuthorDate: Wed Jul 3 21:50:59 2024 +0800
[INLONG-10555][Sort] SortStandalone support report the difference between
two configuration (#10556)
---
.../config/holder/CommonPropertiesHolder.java | 12 +++++
.../loader/SortConfigQueryConsumeConfig.java | 6 ---
.../pojo/IdConfig.java} | 30 +++++-------
.../metrics/SortConfigMetricListener.java | 11 +++--
.../metrics/SortConfigMetricReporter.java | 56 ++++++++++++++++++++--
.../inlong/sort/standalone/sink/SinkContext.java | 3 +-
.../sort/standalone/sink/cls/ClsIdConfig.java | 11 ++---
.../sort/standalone/sink/cls/ClsSinkContext.java | 2 +
.../standalone/sink/elasticsearch/EsIdConfig.java | 11 +++--
.../sink/elasticsearch/EsSinkContext.java | 2 +
.../sink/kafka/KafkaFederationSinkContext.java | 2 +
.../sort/standalone/sink/kafka/KafkaIdConfig.java | 11 +++--
.../sink/pulsar/PulsarFederationSinkContext.java | 2 +
.../standalone/sink/pulsar/PulsarIdConfig.java | 11 +++--
14 files changed, 115 insertions(+), 55 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 1d8447121c..3cd897a56c 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -179,6 +179,14 @@ public class CommonPropertiesHolder {
return defaultValue;
}
+ public static Boolean getBoolean(String key, Boolean defaultValue) {
+ String value = get().get(key);
+ if (value != null) {
+ return Boolean.valueOf(value.trim());
+ }
+ return defaultValue;
+ }
+
/**
* Gets value mapped to key, returning null if unmapped.
* <p>
@@ -220,4 +228,8 @@ public class CommonPropertiesHolder {
return ackPolicy;
}
+ public static boolean useUnifiedConfiguration() {
+ return getBoolean(KEY_USE_UNIFIED_CONFIGURATION, false);
+ }
+
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
index 079044f020..57f5059c16 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
@@ -41,12 +41,6 @@ import java.util.stream.Collectors;
@Slf4j
public class SortConfigQueryConsumeConfig implements QueryConsumeConfig {
- private List<InLongTopic> subscribedTopic = new ArrayList<>();
-
- public void reload() {
-
- }
-
@Override
public ConsumeConfig queryCurrentConsumeConfig(String sortTaskId) {
TaskConfig taskConfig = SortConfigHolder.getTaskConfig(sortTaskId);
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/IdConfig.java
similarity index 52%
copy from
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
copy to
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/IdConfig.java
index d30d6ad0d5..ef87c0d03a 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/IdConfig.java
@@ -15,25 +15,19 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.metrics;
+package org.apache.inlong.sort.standalone.config.pojo;
-import org.apache.inlong.common.pojo.sort.SortConfig;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
-import org.apache.flume.conf.Configurable;
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+@SuperBuilder
+public abstract class IdConfig {
-import java.util.Collection;
-
-public interface SortConfigMetricListener extends Configurable {
-
- void reportOffline(SortConfig sortConfig);
- void reportOnline(SortConfig sortConfig);
- void reportUpdate(SortConfig sortConfig);
- void reportParseFail(String dataflowId);
- void reportRequestConfigFail();
- void reportDecompressFail();
- void reportCheckFail();
- void reportRequestNoUpdate();
- void reportRequestUpdate();
- void reportMissInSortClusterConfig(Collection<String> dataflows);
- void reportMissInSortConfig(Collection<String> dataflows);
+ protected String inlongGroupId;
+ protected String inlongStreamId;
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
index d30d6ad0d5..41897077fd 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
@@ -21,8 +21,6 @@ import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.flume.conf.Configurable;
-import java.util.Collection;
-
public interface SortConfigMetricListener extends Configurable {
void reportOffline(SortConfig sortConfig);
@@ -34,6 +32,11 @@ public interface SortConfigMetricListener extends
Configurable {
void reportCheckFail();
void reportRequestNoUpdate();
void reportRequestUpdate();
- void reportMissInSortClusterConfig(Collection<String> dataflows);
- void reportMissInSortConfig(Collection<String> dataflows);
+ void reportMissInSortClusterConfig(String cluster, String task, String
group, String stream);
+ void reportMissInSortConfig(String cluster, String task, String group,
String stream);
+ void reportClusterDiff(String cluster, String task, String group, String
stream);
+ void reportSourceDiff(String sortClusterName, String sortTaskName, String
topic, String mqClusterName);
+ void reportSourceMissInSortClusterConfig(String sortClusterName, String
sortTaskName,
+ String topic, String mqClusterName);
+ void reportSourceMissInSortConfig(String sortClusterName, String
sortTaskName, String topic, String mqClusterName);
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
index 396cadbc87..001759d8ea 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
@@ -19,8 +19,10 @@ package org.apache.inlong.sort.standalone.metrics;
import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
@@ -120,11 +122,57 @@ public class SortConfigMetricReporter {
listeners.forEach(SortConfigMetricListener::reportRequestUpdate);
}
- public static void reportMissInSortClusterConfig(Collection<String>
dataflows) {
- listeners.forEach(listener ->
listener.reportMissInSortClusterConfig(dataflows));
+ public static void reportClusterDiff(
+ String sortClusterName,
+ String sortTaskName,
+ Map<String, ? extends IdConfig> fromTaskConfig,
+ Map<String, ? extends IdConfig> fromSortTaskConfig) {
+ Collection<String> intersection =
CollectionUtils.intersection(fromTaskConfig.keySet(),
+ fromSortTaskConfig.keySet());
+ List<IdConfig> diff = intersection.stream()
+ .filter(k ->
!fromTaskConfig.get(k).equals(fromSortTaskConfig.get(k)))
+ .map(fromSortTaskConfig::get)
+ .collect(Collectors.toList());
+ // report diff
+ diff.forEach(idConfig -> {
+ listeners.forEach(listener ->
listener.reportClusterDiff(sortClusterName, sortTaskName,
+ idConfig.getInlongGroupId(),
idConfig.getInlongStreamId()));
+ });
+
+ // report miss in sort cluster config
+ fromTaskConfig.forEach((k, v) -> {
+ if (!intersection.contains(k)) {
+ listeners.forEach(listener ->
listener.reportMissInSortClusterConfig(sortClusterName, sortTaskName,
+ v.getInlongGroupId(), v.getInlongStreamId()));
+ }
+ });
+
+ // report miss in sort config
+ fromSortTaskConfig.forEach((k, v) -> {
+ if (!intersection.contains(k)) {
+ listeners.forEach(listener ->
listener.reportMissInSortConfig(sortClusterName, sortTaskName,
+ v.getInlongGroupId(), v.getInlongStreamId()));
+ }
+ });
+ }
+
+ public static void reportSourceDiff(
+ String sortClusterName, String sortTaskName,
+ String topic, String mqClusterName) {
+ listeners.forEach(listener ->
listener.reportSourceDiff(sortClusterName, sortTaskName, topic, mqClusterName));
+ }
+
+ public static void reportSourceMissInSortClusterConfig(
+ String sortClusterName, String sortTaskName,
+ String topic, String mqClusterName) {
+ listeners.forEach(listener ->
listener.reportSourceMissInSortClusterConfig(sortClusterName, sortTaskName,
topic,
+ mqClusterName));
}
- public static void reportMissInSortConfig(Collection<String> dataflows) {
- listeners.forEach(listener ->
listener.reportMissInSortConfig(dataflows));
+ public static void reportSourceMissInSortConfig(
+ String sortClusterName, String sortTaskName,
+ String topic, String mqClusterName) {
+ listeners.forEach(
+ listener ->
listener.reportSourceMissInSortConfig(sortClusterName, sortTaskName, topic,
mqClusterName));
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 36e3d11254..f7f7d9b71d 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -73,8 +73,7 @@ public class SinkContext {
this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL,
100);
this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
this.metricItemSet = new SortMetricItemSet(sinkName);
- this.unifiedConfiguration =
sinkContext.getBoolean(CommonPropertiesHolder.KEY_USE_UNIFIED_CONFIGURATION,
- false);
+ this.unifiedConfiguration =
CommonPropertiesHolder.useUnifiedConfiguration();
MetricRegister.register(this.metricItemSet);
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
index 3c167be300..cbf402a83f 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
@@ -21,12 +21,13 @@ import
org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.ClsSinkConfig;
import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
+import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.stream.Collectors;
@@ -34,15 +35,13 @@ import java.util.stream.Collectors;
/**
* Cls config of each uid.
*/
+@EqualsAndHashCode(callSuper = true)
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
-@EqualsAndHashCode
-public class ClsIdConfig {
+@SuperBuilder
+public class ClsIdConfig extends IdConfig {
- private String inlongGroupId;
- private String inlongStreamId;
private String separator = "|";
private String endpoint;
private String secretId;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 09c8742c04..2c58cbf7ac 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -26,6 +26,7 @@ import
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
@@ -117,6 +118,7 @@ public class ClsSinkContext extends SinkContext {
Map<String, ClsIdConfig> fromTaskConfig =
reloadIdParamsFromTaskConfig(taskConfig, clsNodeConfig);
Map<String, ClsIdConfig> fromSortTaskConfig =
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
+ SortConfigMetricReporter.reportClusterDiff(clusterId, taskName,
fromTaskConfig, fromSortTaskConfig);
idConfigMap = unifiedConfiguration ? fromTaskConfig :
fromSortTaskConfig;
this.reloadClients(idConfigMap);
this.reloadHandler();
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
index 60d5d447dd..cc98ab57d6 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
@@ -20,22 +20,25 @@ package
org.apache.inlong.sort.standalone.sink.elasticsearch;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.EsSinkConfig;
+import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
+@EqualsAndHashCode(callSuper = true)
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
-public class EsIdConfig {
+@SuperBuilder
+public class EsIdConfig extends IdConfig {
public static final String PATTERN_DAY = "{yyyyMMdd}";
public static final String PATTERN_HOUR = "{yyyyMMddHH}";
@@ -62,8 +65,6 @@ public class EsIdConfig {
}
};
- private String inlongGroupId;
- private String inlongStreamId;
private String separator = "|";
private String indexNamePattern;
private String fieldNames;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 66ad584427..9da300057b 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -26,6 +26,7 @@ import
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
@@ -159,6 +160,7 @@ public class EsSinkContext extends SinkContext {
idConfigMap = fromSortTaskConfig;
reloadClientsFromSortTaskConfig(sortTaskConfig);
}
+ SortConfigMetricReporter.reportClusterDiff(clusterId, taskName,
fromTaskConfig, fromSortTaskConfig);
// log
LOG.info("end to get
SortTaskConfig:taskName:{}:newIdConfigMap:{}", taskName,
objectMapper.writeValueAsString(idConfigMap));
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index 6e9a478f60..739b214b1e 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -26,6 +26,7 @@ import
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
@@ -83,6 +84,7 @@ public class KafkaFederationSinkContext extends SinkContext {
Map<String, KafkaIdConfig> fromTaskConfig =
fromTaskConfig(taskConfig);
Map<String, KafkaIdConfig> fromSortTaskConfig =
fromSortTaskConfig(sortTaskConfig);
+ SortConfigMetricReporter.reportClusterDiff(clusterId, taskName,
fromTaskConfig, fromSortTaskConfig);
this.idConfigMap = unifiedConfiguration ? fromTaskConfig :
fromSortTaskConfig;
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index e7b7b17ea4..ecbd8ad6f3 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -20,28 +20,29 @@ package org.apache.inlong.sort.standalone.sink.kafka;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
+import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.utils.Constants;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
import java.util.Map;
+@EqualsAndHashCode(callSuper = true)
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
-public class KafkaIdConfig {
+@SuperBuilder
+public class KafkaIdConfig extends IdConfig {
public static final String KEY_DATA_TYPE = "dataType";
public static final String KEY_SEPARATOR = "separator";
public static final String DEFAULT_SEPARATOR = "|";
- private String inlongGroupId;
- private String inlongStreamId;
private String uid;
private String separator = "|";
private String topic;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index a31d9f90ba..f5fe9c5b96 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -26,6 +26,7 @@ import
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
@@ -77,6 +78,7 @@ public class PulsarFederationSinkContext extends SinkContext {
Map<String, PulsarIdConfig> fromTaskConfig =
fromTaskConfig(taskConfig);
Map<String, PulsarIdConfig> fromSortTaskConfig =
fromSortTaskConfig(sortTaskConfig);
+ SortConfigMetricReporter.reportClusterDiff(clusterId, taskName,
fromTaskConfig, fromSortTaskConfig);
idConfigMap = unifiedConfiguration ? fromTaskConfig :
fromSortTaskConfig;
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index 69b042506d..c8fc0c33b2 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -20,21 +20,24 @@ package org.apache.inlong.sort.standalone.sink.pulsar;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
+import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.utils.Constants;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
import java.util.Map;
+@EqualsAndHashCode(callSuper = true)
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
-public class PulsarIdConfig {
+@SuperBuilder
+public class PulsarIdConfig extends IdConfig {
public static final String KEY_DATA_TYPE = "dataType";
public static final String KEY_SEPARATOR = "separator";
@@ -42,8 +45,6 @@ public class PulsarIdConfig {
private static final String DEFAULT_INLONG_STREAM = "1";
- private String inlongGroupId;
- private String inlongStreamId;
private String uid;
private String separator = "|";
private String topic;