This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e2fa1f9b [INLONG-7389][Manager][Sort] Add audit id info for source 
(#7390)
0e2fa1f9b is described below

commit 0e2fa1f9be9ce5be76ef033c9746782c93a035c4
Author: fuweng11 <[email protected]>
AuthorDate: Wed Mar 8 18:27:51 2023 +0800

    [INLONG-7389][Manager][Sort] Add audit id info for source (#7390)
---
 .../service/resource/sort/DefaultSortConfigOperator.java     | 12 ++++++++++++
 .../sort/protocol/node/extract/KafkaExtractNodeTest.java     | 12 +++++++++++-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 2664cee1e..22cf7fd23 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.service.core.AuditService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.service.transform.StreamTransformService;
@@ -67,6 +68,8 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
     private StreamTransformService transformService;
     @Autowired
     private StreamSinkService sinkService;
+    @Autowired
+    private AuditService auditService;
 
     @Override
     public Boolean accept(Integer enableZk) {
@@ -121,6 +124,15 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
             // build a stream info from the nodes and relations
             List<StreamSource> sources = sourceMap.get(streamId);
             List<StreamSink> sinks = sinkMap.get(streamId);
+            // get audit list by sink type
+            List<String> auditIds = new ArrayList<>();
+            for (StreamSink sink : sinks) {
+                auditIds.add(auditService.getAuditId(sink.getSinkType(), 
false));
+            }
+            for (StreamSource source : sources) {
+                Map<String, Object> properties = source.getProperties();
+                properties.putIfAbsent("metrics.audit.key", String.join("&", 
auditIds));
+            }
             List<NodeRelation> relations;
 
             if 
(InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index cda33fae9..fae44c3d4 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -31,10 +31,12 @@ import 
org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -42,6 +44,8 @@ import static org.junit.Assert.assertEquals;
  */
 public class KafkaExtractNodeTest extends SerializeBaseTest<KafkaExtractNode> {
 
+    public static final String AUDIT_ID_SORT_INPUT = "7";
+    public static final String AUDIT_ID_SORT_OUTPUT = "8";
     private final ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
@@ -49,7 +53,12 @@ public class KafkaExtractNodeTest extends 
SerializeBaseTest<KafkaExtractNode> {
         List<FieldInfo> fields = Arrays.asList(
                 new FieldInfo("name", new StringFormatInfo()),
                 new FieldInfo("age", new IntFormatInfo()));
-        return new KafkaExtractNode("1", "kafka_input", fields, null, null, 
"workerCsv",
+        Map<String, String> properties = new HashMap<>();
+        List<String> auditIds = new ArrayList<>();
+        auditIds.add(AUDIT_ID_SORT_INPUT);
+        auditIds.add(AUDIT_ID_SORT_OUTPUT);
+        properties.putIfAbsent("metrics.audit.key", String.join("&", 
auditIds));
+        return new KafkaExtractNode("1", "kafka_input", fields, null, 
properties, "workerCsv",
                 "localhost:9092", new CsvFormat(), 
KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId", null, null);
     }
 
@@ -128,6 +137,7 @@ public class KafkaExtractNodeTest extends 
SerializeBaseTest<KafkaExtractNode> {
         assertEquals("inlong-msg", options.get("format"));
         assertEquals("csv", options.get("inlong-msg.inner.format"));
         assertEquals("true", 
options.get("inlong-msg.csv.ignore-parse-errors"));
+        assertEquals("7&8", options.get("metrics.audit.key"));
 
         kafkaNode.setFormat(new CsvFormat());
         Map<String, String> csvOptions = kafkaNode.tableOptions();

Reply via email to