This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0e2fa1f9b [INLONG-7389][Manager][Sort] Add audit id info for source
(#7390)
0e2fa1f9b is described below
commit 0e2fa1f9be9ce5be76ef033c9746782c93a035c4
Author: fuweng11 <[email protected]>
AuthorDate: Wed Mar 8 18:27:51 2023 +0800
[INLONG-7389][Manager][Sort] Add audit id info for source (#7390)
---
.../service/resource/sort/DefaultSortConfigOperator.java | 12 ++++++++++++
.../sort/protocol/node/extract/KafkaExtractNodeTest.java | 12 +++++++++++-
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 2664cee1e..22cf7fd23 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.transform.StreamTransformService;
@@ -67,6 +68,8 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
private StreamTransformService transformService;
@Autowired
private StreamSinkService sinkService;
+ @Autowired
+ private AuditService auditService;
@Override
public Boolean accept(Integer enableZk) {
@@ -121,6 +124,15 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
// build a stream info from the nodes and relations
List<StreamSource> sources = sourceMap.get(streamId);
List<StreamSink> sinks = sinkMap.get(streamId);
+ // get audit list by sink type
+ List<String> auditIds = new ArrayList<>();
+ for (StreamSink sink : sinks) {
+ auditIds.add(auditService.getAuditId(sink.getSinkType(),
false));
+ }
+ for (StreamSource source : sources) {
+ Map<String, Object> properties = source.getProperties();
+ properties.putIfAbsent("metrics.audit.key", String.join("&",
auditIds));
+ }
List<NodeRelation> relations;
if
(InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index cda33fae9..fae44c3d4 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -31,10 +31,12 @@ import
org.apache.inlong.sort.protocol.node.format.RawFormat;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import static org.junit.Assert.assertEquals;
/**
@@ -42,6 +44,8 @@ import static org.junit.Assert.assertEquals;
*/
public class KafkaExtractNodeTest extends SerializeBaseTest<KafkaExtractNode> {
+ public static final String AUDIT_ID_SORT_INPUT = "7";
+ public static final String AUDIT_ID_SORT_OUTPUT = "8";
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
@@ -49,7 +53,12 @@ public class KafkaExtractNodeTest extends
SerializeBaseTest<KafkaExtractNode> {
List<FieldInfo> fields = Arrays.asList(
new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("age", new IntFormatInfo()));
- return new KafkaExtractNode("1", "kafka_input", fields, null, null,
"workerCsv",
+ Map<String, String> properties = new HashMap<>();
+ List<String> auditIds = new ArrayList<>();
+ auditIds.add(AUDIT_ID_SORT_INPUT);
+ auditIds.add(AUDIT_ID_SORT_OUTPUT);
+ properties.putIfAbsent("metrics.audit.key", String.join("&",
auditIds));
+ return new KafkaExtractNode("1", "kafka_input", fields, null,
properties, "workerCsv",
"localhost:9092", new CsvFormat(),
KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId", null, null);
}
@@ -128,6 +137,7 @@ public class KafkaExtractNodeTest extends
SerializeBaseTest<KafkaExtractNode> {
assertEquals("inlong-msg", options.get("format"));
assertEquals("csv", options.get("inlong-msg.inner.format"));
assertEquals("true",
options.get("inlong-msg.csv.ignore-parse-errors"));
+ assertEquals("7&8", options.get("metrics.audit.key"));
kafkaNode.setFormat(new CsvFormat());
Map<String, String> csvOptions = kafkaNode.tableOptions();