This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 88595f8dbc [INLONG-10555][Sort] SortStandalone support report the 
difference between two configuration (#10556)
88595f8dbc is described below

commit 88595f8dbcacdb79b878f366c506b42fde2d6244
Author: vernedeng <[email protected]>
AuthorDate: Wed Jul 3 21:50:59 2024 +0800

    [INLONG-10555][Sort] SortStandalone support report the difference between 
two configuration (#10556)
---
 .../config/holder/CommonPropertiesHolder.java      | 12 +++++
 .../loader/SortConfigQueryConsumeConfig.java       |  6 ---
 .../pojo/IdConfig.java}                            | 30 +++++-------
 .../metrics/SortConfigMetricListener.java          | 11 +++--
 .../metrics/SortConfigMetricReporter.java          | 56 ++++++++++++++++++++--
 .../inlong/sort/standalone/sink/SinkContext.java   |  3 +-
 .../sort/standalone/sink/cls/ClsIdConfig.java      | 11 ++---
 .../sort/standalone/sink/cls/ClsSinkContext.java   |  2 +
 .../standalone/sink/elasticsearch/EsIdConfig.java  | 11 +++--
 .../sink/elasticsearch/EsSinkContext.java          |  2 +
 .../sink/kafka/KafkaFederationSinkContext.java     |  2 +
 .../sort/standalone/sink/kafka/KafkaIdConfig.java  | 11 +++--
 .../sink/pulsar/PulsarFederationSinkContext.java   |  2 +
 .../standalone/sink/pulsar/PulsarIdConfig.java     | 11 +++--
 14 files changed, 115 insertions(+), 55 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 1d8447121c..3cd897a56c 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -179,6 +179,14 @@ public class CommonPropertiesHolder {
         return defaultValue;
     }
 
+    public static Boolean getBoolean(String key, Boolean defaultValue) {
+        String value = get().get(key);
+        if (value != null) {
+            return Boolean.valueOf(value.trim());
+        }
+        return defaultValue;
+    }
+
     /**
      * Gets value mapped to key, returning null if unmapped.
      * <p>
@@ -220,4 +228,8 @@ public class CommonPropertiesHolder {
         return ackPolicy;
     }
 
+    public static boolean useUnifiedConfiguration() {
+        return getBoolean(KEY_USE_UNIFIED_CONFIGURATION, false);
+    }
+
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
index 079044f020..57f5059c16 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
@@ -41,12 +41,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public class SortConfigQueryConsumeConfig implements QueryConsumeConfig {
 
-    private List<InLongTopic> subscribedTopic = new ArrayList<>();
-
-    public void reload() {
-
-    }
-
     @Override
     public ConsumeConfig queryCurrentConsumeConfig(String sortTaskId) {
         TaskConfig taskConfig = SortConfigHolder.getTaskConfig(sortTaskId);
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/IdConfig.java
similarity index 52%
copy from 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
copy to 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/IdConfig.java
index d30d6ad0d5..ef87c0d03a 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/IdConfig.java
@@ -15,25 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.metrics;
+package org.apache.inlong.sort.standalone.config.pojo;
 
-import org.apache.inlong.common.pojo.sort.SortConfig;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
 
-import org.apache.flume.conf.Configurable;
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+@SuperBuilder
+public abstract class IdConfig {
 
-import java.util.Collection;
-
-public interface SortConfigMetricListener extends Configurable {
-
-    void reportOffline(SortConfig sortConfig);
-    void reportOnline(SortConfig sortConfig);
-    void reportUpdate(SortConfig sortConfig);
-    void reportParseFail(String dataflowId);
-    void reportRequestConfigFail();
-    void reportDecompressFail();
-    void reportCheckFail();
-    void reportRequestNoUpdate();
-    void reportRequestUpdate();
-    void reportMissInSortClusterConfig(Collection<String> dataflows);
-    void reportMissInSortConfig(Collection<String> dataflows);
+    protected String inlongGroupId;
+    protected String inlongStreamId;
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
index d30d6ad0d5..41897077fd 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricListener.java
@@ -21,8 +21,6 @@ import org.apache.inlong.common.pojo.sort.SortConfig;
 
 import org.apache.flume.conf.Configurable;
 
-import java.util.Collection;
-
 public interface SortConfigMetricListener extends Configurable {
 
     void reportOffline(SortConfig sortConfig);
@@ -34,6 +32,11 @@ public interface SortConfigMetricListener extends 
Configurable {
     void reportCheckFail();
     void reportRequestNoUpdate();
     void reportRequestUpdate();
-    void reportMissInSortClusterConfig(Collection<String> dataflows);
-    void reportMissInSortConfig(Collection<String> dataflows);
+    void reportMissInSortClusterConfig(String cluster, String task, String 
group, String stream);
+    void reportMissInSortConfig(String cluster, String task, String group, 
String stream);
+    void reportClusterDiff(String cluster, String task, String group, String 
stream);
+    void reportSourceDiff(String sortClusterName, String sortTaskName, String 
topic, String mqClusterName);
+    void reportSourceMissInSortClusterConfig(String sortClusterName, String 
sortTaskName,
+            String topic, String mqClusterName);
+    void reportSourceMissInSortConfig(String sortClusterName, String 
sortTaskName, String topic, String mqClusterName);
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
index 396cadbc87..001759d8ea 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
@@ -19,8 +19,10 @@ package org.apache.inlong.sort.standalone.metrics;
 
 import org.apache.inlong.common.pojo.sort.SortConfig;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
@@ -120,11 +122,57 @@ public class SortConfigMetricReporter {
         listeners.forEach(SortConfigMetricListener::reportRequestUpdate);
     }
 
-    public static void reportMissInSortClusterConfig(Collection<String> 
dataflows) {
-        listeners.forEach(listener -> 
listener.reportMissInSortClusterConfig(dataflows));
+    public static void reportClusterDiff(
+            String sortClusterName,
+            String sortTaskName,
+            Map<String, ? extends IdConfig> fromTaskConfig,
+            Map<String, ? extends IdConfig> fromSortTaskConfig) {
+        Collection<String> intersection = 
CollectionUtils.intersection(fromTaskConfig.keySet(),
+                fromSortTaskConfig.keySet());
+        List<IdConfig> diff = intersection.stream()
+                .filter(k -> 
!fromTaskConfig.get(k).equals(fromSortTaskConfig.get(k)))
+                .map(fromSortTaskConfig::get)
+                .collect(Collectors.toList());
+        // report diff
+        diff.forEach(idConfig -> {
+            listeners.forEach(listener -> 
listener.reportClusterDiff(sortClusterName, sortTaskName,
+                    idConfig.getInlongGroupId(), 
idConfig.getInlongStreamId()));
+        });
+
+        // report miss in sort cluster config
+        fromTaskConfig.forEach((k, v) -> {
+            if (!intersection.contains(k)) {
+                listeners.forEach(listener -> 
listener.reportMissInSortClusterConfig(sortClusterName, sortTaskName,
+                        v.getInlongGroupId(), v.getInlongStreamId()));
+            }
+        });
+
+        // report miss in sort config
+        fromSortTaskConfig.forEach((k, v) -> {
+            if (!intersection.contains(k)) {
+                listeners.forEach(listener -> 
listener.reportMissInSortConfig(sortClusterName, sortTaskName,
+                        v.getInlongGroupId(), v.getInlongStreamId()));
+            }
+        });
+    }
+
+    public static void reportSourceDiff(
+            String sortClusterName, String sortTaskName,
+            String topic, String mqClusterName) {
+        listeners.forEach(listener -> 
listener.reportSourceDiff(sortClusterName, sortTaskName, topic, mqClusterName));
+    }
+
+    public static void reportSourceMissInSortClusterConfig(
+            String sortClusterName, String sortTaskName,
+            String topic, String mqClusterName) {
+        listeners.forEach(listener -> 
listener.reportSourceMissInSortClusterConfig(sortClusterName, sortTaskName, 
topic,
+                mqClusterName));
     }
 
-    public static void reportMissInSortConfig(Collection<String> dataflows) {
-        listeners.forEach(listener -> 
listener.reportMissInSortConfig(dataflows));
+    public static void reportSourceMissInSortConfig(
+            String sortClusterName, String sortTaskName,
+            String topic, String mqClusterName) {
+        listeners.forEach(
+                listener -> 
listener.reportSourceMissInSortConfig(sortClusterName, sortTaskName, topic, 
mqClusterName));
     }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 36e3d11254..f7f7d9b71d 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -73,8 +73,7 @@ public class SinkContext {
         this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 
100);
         this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
         this.metricItemSet = new SortMetricItemSet(sinkName);
-        this.unifiedConfiguration = 
sinkContext.getBoolean(CommonPropertiesHolder.KEY_USE_UNIFIED_CONFIGURATION,
-                false);
+        this.unifiedConfiguration = 
CommonPropertiesHolder.useUnifiedConfiguration();
         MetricRegister.register(this.metricItemSet);
     }
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
index 3c167be300..cbf402a83f 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
@@ -21,12 +21,13 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.ClsSinkConfig;
 import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
+import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
 
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -34,15 +35,13 @@ import java.util.stream.Collectors;
 /**
  * Cls config of each uid.
  */
+@EqualsAndHashCode(callSuper = true)
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@EqualsAndHashCode
-public class ClsIdConfig {
+@SuperBuilder
+public class ClsIdConfig extends IdConfig {
 
-    private String inlongGroupId;
-    private String inlongStreamId;
     private String separator = "|";
     private String endpoint;
     private String secretId;
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 09c8742c04..2c58cbf7ac 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -26,6 +26,7 @@ import 
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
@@ -117,6 +118,7 @@ public class ClsSinkContext extends SinkContext {
 
             Map<String, ClsIdConfig> fromTaskConfig = 
reloadIdParamsFromTaskConfig(taskConfig, clsNodeConfig);
             Map<String, ClsIdConfig> fromSortTaskConfig = 
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
+            SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, 
fromTaskConfig, fromSortTaskConfig);
             idConfigMap = unifiedConfiguration ? fromTaskConfig : 
fromSortTaskConfig;
             this.reloadClients(idConfigMap);
             this.reloadHandler();
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
index 60d5d447dd..cc98ab57d6 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
@@ -20,22 +20,25 @@ package 
org.apache.inlong.sort.standalone.sink.elasticsearch;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.EsSinkConfig;
+import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
 
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 import java.util.stream.Collectors;
 
+@EqualsAndHashCode(callSuper = true)
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class EsIdConfig {
+@SuperBuilder
+public class EsIdConfig extends IdConfig {
 
     public static final String PATTERN_DAY = "{yyyyMMdd}";
     public static final String PATTERN_HOUR = "{yyyyMMddHH}";
@@ -62,8 +65,6 @@ public class EsIdConfig {
         }
     };
 
-    private String inlongGroupId;
-    private String inlongStreamId;
     private String separator = "|";
     private String indexNamePattern;
     private String fieldNames;
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 66ad584427..9da300057b 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -26,6 +26,7 @@ import 
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
@@ -159,6 +160,7 @@ public class EsSinkContext extends SinkContext {
                 idConfigMap = fromSortTaskConfig;
                 reloadClientsFromSortTaskConfig(sortTaskConfig);
             }
+            SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, 
fromTaskConfig, fromSortTaskConfig);
             // log
             LOG.info("end to get 
SortTaskConfig:taskName:{}:newIdConfigMap:{}", taskName,
                     objectMapper.writeValueAsString(idConfigMap));
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index 6e9a478f60..739b214b1e 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -26,6 +26,7 @@ import 
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
@@ -83,6 +84,7 @@ public class KafkaFederationSinkContext extends SinkContext {
 
             Map<String, KafkaIdConfig> fromTaskConfig = 
fromTaskConfig(taskConfig);
             Map<String, KafkaIdConfig> fromSortTaskConfig = 
fromSortTaskConfig(sortTaskConfig);
+            SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, 
fromTaskConfig, fromSortTaskConfig);
             this.idConfigMap = unifiedConfiguration ? fromTaskConfig : 
fromSortTaskConfig;
         } catch (Throwable e) {
             LOG.error(e.getMessage(), e);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index e7b7b17ea4..ecbd8ad6f3 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -20,28 +20,29 @@ package org.apache.inlong.sort.standalone.sink.kafka;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
+import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.utils.Constants;
 
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
 
 import java.util.Map;
 
+@EqualsAndHashCode(callSuper = true)
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class KafkaIdConfig {
+@SuperBuilder
+public class KafkaIdConfig extends IdConfig {
 
     public static final String KEY_DATA_TYPE = "dataType";
     public static final String KEY_SEPARATOR = "separator";
     public static final String DEFAULT_SEPARATOR = "|";
 
-    private String inlongGroupId;
-    private String inlongStreamId;
     private String uid;
     private String separator = "|";
     private String topic;
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index a31d9f90ba..f5fe9c5b96 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -26,6 +26,7 @@ import 
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
@@ -77,6 +78,7 @@ public class PulsarFederationSinkContext extends SinkContext {
 
             Map<String, PulsarIdConfig> fromTaskConfig = 
fromTaskConfig(taskConfig);
             Map<String, PulsarIdConfig> fromSortTaskConfig = 
fromSortTaskConfig(sortTaskConfig);
+            SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, 
fromTaskConfig, fromSortTaskConfig);
             idConfigMap = unifiedConfiguration ? fromTaskConfig : 
fromSortTaskConfig;
         } catch (Throwable e) {
             LOG.error(e.getMessage(), e);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index 69b042506d..c8fc0c33b2 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -20,21 +20,24 @@ package org.apache.inlong.sort.standalone.sink.pulsar;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
+import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.utils.Constants;
 
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
 
 import java.util.Map;
 
+@EqualsAndHashCode(callSuper = true)
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class PulsarIdConfig {
+@SuperBuilder
+public class PulsarIdConfig extends IdConfig {
 
     public static final String KEY_DATA_TYPE = "dataType";
     public static final String KEY_SEPARATOR = "separator";
@@ -42,8 +45,6 @@ public class PulsarIdConfig {
 
     private static final String DEFAULT_INLONG_STREAM = "1";
 
-    private String inlongGroupId;
-    private String inlongStreamId;
     private String uid;
     private String separator = "|";
     private String topic;

Reply via email to