This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ca69154  [INLONG-2043][Bug] Sort module renames tid to streamId (#2044)
ca69154 is described below

commit ca69154f0c82f57d5b9d52296385d4dbff18a592
Author: Biao Liu <[email protected]>
AuthorDate: Tue Dec 21 10:02:10 2021 +0800

    [INLONG-2043][Bug] Sort module renames tid to streamId (#2044)
---
 .../MultiTenancyTDMsgMixedDeserializerTest.java    |  5 ++--
 .../inlong/sort/formats/tdmsg/TDMsgUtils.java      |  4 +--
 .../sort/formats/tdmsgcsv/TDMsgCsvUtils.java       | 23 +++++----------
 .../tdmsgcsv/TDMsgCsvFormatDeserializerTest.java   | 33 +++++++++++-----------
 4 files changed, 28 insertions(+), 37 deletions(-)

diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
index 397544e..37dddfc 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
@@ -30,6 +30,7 @@ import org.apache.inlong.sort.flink.Record;
 import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import 
org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
@@ -79,7 +80,7 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends 
TestLogger {
         deserializer.addDataFlow(dataFlowInfo);
 
         final TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        final String attrs = "m=0&iname=tid&t=20210513";
+        final String attrs = "m=0&" + TDMsgUtils.TDMSG_ATTR_STREAM_ID + 
"=tid&t=20210513";
         final String body1 = "tianqiwan|29";
         tdMsg1.addMsg(attrs, body1.getBytes());
 
@@ -93,7 +94,7 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends 
TestLogger {
         assertEquals(new Timestamp(time), 
collector.results.get(0).getRow().getField(0));
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("m", "0");
-        attributes.put("iname", "tid");
+        attributes.put(TDMsgUtils.TDMSG_ATTR_STREAM_ID, "tid");
         attributes.put("t", "20210513");
         assertEquals(attributes, 
collector.results.get(0).getRow().getField(1));
         assertEquals("tianqiwan", 
collector.results.get(0).getRow().getField(2));
diff --git 
a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
 
b/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
index 7746da8..33d715f 100644
--- 
a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
+++ 
b/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
@@ -52,9 +52,7 @@ public class TDMsgUtils {
     public static final char TDMSG_ATTR_KV_DELIMITER = '=';
 
     // keys in attributes
-    public static final String TDMSG_ATTR_INTERFACE_NAME = "iname";
-    public static final String TDMSG_ATTR_INTERFACE_ID = "id";
-    public static final String TDMSG_ATTR_INTERFACE_TID = "tid";
+    public static final String TDMSG_ATTR_STREAM_ID = "streamId";
     public static final String TDMSG_ATTR_TIME_T = "t";
     public static final String TDMSG_ATTR_TIME_DT = "dt";
     public static final String TDMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol";
diff --git 
a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
 
b/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
index 88f32b2..f935e6d 100644
--- 
a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
@@ -18,9 +18,7 @@
 
 package org.apache.inlong.sort.formats.tdmsgcsv;
 
-import static 
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_INTERFACE_ID;
-import static 
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_INTERFACE_NAME;
-import static 
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_INTERFACE_TID;
+import static 
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_STREAM_ID;
 import static 
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_TIME_DT;
 import static 
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_TIME_T;
 import static 
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.getPredefinedFields;
@@ -59,19 +57,12 @@ public class TDMsgCsvUtils {
         Map<String, String> attributes = parseAttr(attr);
 
         // Extracts interface from the attributes.
-        String tid;
-
-        if (attributes.containsKey(TDMSG_ATTR_INTERFACE_NAME)) {
-            tid = attributes.get(TDMSG_ATTR_INTERFACE_NAME);
-        } else if (attributes.containsKey(TDMSG_ATTR_INTERFACE_ID)) {
-            tid = attributes.get(TDMSG_ATTR_INTERFACE_ID);
-        } else if (attributes.containsKey(TDMSG_ATTR_INTERFACE_TID)) {
-            tid = attributes.get(TDMSG_ATTR_INTERFACE_TID);
+        String streamId;
+
+        if (attributes.containsKey(TDMSG_ATTR_STREAM_ID)) {
+            streamId = attributes.get(TDMSG_ATTR_STREAM_ID);
         } else {
-            throw new IllegalArgumentException(
-                    "Could not find " + TDMSG_ATTR_INTERFACE_NAME
-                            + " or " + TDMSG_ATTR_INTERFACE_ID
-                            + " or " + TDMSG_ATTR_INTERFACE_TID + " in 
attributes!");
+            throw new IllegalArgumentException("Could not find " + 
TDMSG_ATTR_STREAM_ID + " in attributes!");
         }
 
         // Extracts time from the attributes
@@ -92,7 +83,7 @@ public class TDMsgCsvUtils {
         // Extracts predefined fields from the attributes
         List<String> predefinedFields = getPredefinedFields(attributes);
 
-        return new TDMsgHead(attributes, tid, time, predefinedFields);
+        return new TDMsgHead(attributes, streamId, time, predefinedFields);
     }
 
     public static TDMsgBody parseBody(
diff --git 
a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
index 72d0575..91026f7 100644
--- 
a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.tdmsgcsv;
 
 import static 
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_STREAM_ID;
 import static org.junit.Assert.assertEquals;
 
 import java.nio.charset.Charset;
@@ -98,7 +99,7 @@ public class TDMsgCsvFormatDeserializerTest {
                 new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
         TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = 
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + 
"=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
         String body1 = "123,field11,field12,field13";
         String body2 = "123,field21,field22,field23";
         tdMsg1.addMsg(attrs, body1.getBytes());
@@ -106,7 +107,7 @@ public class TDMsgCsvFormatDeserializerTest {
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put("iname", "testInterfaceId");
+        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
@@ -147,7 +148,7 @@ public class TDMsgCsvFormatDeserializerTest {
                 new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
         TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = 
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + 
"=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
         String body1 = "123,field11,field12,";
         String body2 = "123,field21,,field23";
         tdMsg1.addMsg(attrs, body1.getBytes());
@@ -155,7 +156,7 @@ public class TDMsgCsvFormatDeserializerTest {
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put("iname", "testInterfaceId");
+        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
@@ -196,7 +197,7 @@ public class TDMsgCsvFormatDeserializerTest {
                 new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
         TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&iname=testInterfaceId&t=20200322";
+        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + 
"=testInterfaceId&t=20200322";
         String body1 = "1,2,123,field11,field12,";
         String body2 = "1,2,123,field21,,field23";
         tdMsg1.addMsg(attrs, body1.getBytes());
@@ -204,7 +205,7 @@ public class TDMsgCsvFormatDeserializerTest {
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put("iname", "testInterfaceId");
+        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
 
         Row expectedRow1 = Row.of(
@@ -281,7 +282,7 @@ public class TDMsgCsvFormatDeserializerTest {
                 );
 
         TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = 
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + 
"=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
         String body1 = "aaa,field11,field12,field13";
         String body2 = "123,field21,field22,field23";
         tdMsg1.addMsg(attrs, body1.getBytes());
@@ -289,7 +290,7 @@ public class TDMsgCsvFormatDeserializerTest {
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put("iname", "testInterfaceId");
+        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
@@ -329,14 +330,14 @@ public class TDMsgCsvFormatDeserializerTest {
                 );
 
         TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&iname=testInterfaceId&t=20200322";
+        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + 
"=testInterfaceId&t=20200322";
         String body = ",1,2,3,field1,field2,field3";
 
         tdMsg1.addMsg(attrs, body.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put("iname", "testInterfaceId");
+        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
 
         Row expectedRow = Row.of(
@@ -374,14 +375,14 @@ public class TDMsgCsvFormatDeserializerTest {
                 );
 
         TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&iname=testInterfaceId&t=20200322";
+        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + 
"=testInterfaceId&t=20200322";
         String body = ",1,2,field1,field2,field3";
 
         tdMsg1.addMsg(attrs, body.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put("iname", "testInterfaceId");
+        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
 
         Row expectedRow = Row.of(
@@ -408,7 +409,7 @@ public class TDMsgCsvFormatDeserializerTest {
                 new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
         TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = 
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + 
"=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
         String body1 = "123,field11,field12";
         String body2 = "123,field21,field22,field23,field24";
         tdMsg1.addMsg(attrs, body1.getBytes());
@@ -416,7 +417,7 @@ public class TDMsgCsvFormatDeserializerTest {
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put("iname", "testInterfaceId");
+        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
@@ -456,14 +457,14 @@ public class TDMsgCsvFormatDeserializerTest {
                 new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
         TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&"
+        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + 
"=testInterfaceId&t=20200322&__addcol1__=1&"
                + 
"__addcol2__=2&__addcol3__=3&__addcol4__=4&__addcol5__=5&__addcol6__=6&__addcol7__=7";
         String body = "field11,field12";
         tdMsg1.addMsg(attrs, body.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put("iname", "testInterfaceId");
+        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");

Reply via email to