This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ca69154 [INLONG-2043][Bug] Sort module renames tid to streamId (#2044)
ca69154 is described below
commit ca69154f0c82f57d5b9d52296385d4dbff18a592
Author: Biao Liu <[email protected]>
AuthorDate: Tue Dec 21 10:02:10 2021 +0800
[INLONG-2043][Bug] Sort module renames tid to streamId (#2044)
---
.../MultiTenancyTDMsgMixedDeserializerTest.java | 5 ++--
.../inlong/sort/formats/tdmsg/TDMsgUtils.java | 4 +--
.../sort/formats/tdmsgcsv/TDMsgCsvUtils.java | 23 +++++----------
.../tdmsgcsv/TDMsgCsvFormatDeserializerTest.java | 33 +++++++++++-----------
4 files changed, 28 insertions(+), 37 deletions(-)
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
index 397544e..37dddfc 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
@@ -30,6 +30,7 @@ import org.apache.inlong.sort.flink.Record;
import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import
org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
@@ -79,7 +80,7 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends
TestLogger {
deserializer.addDataFlow(dataFlowInfo);
final TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- final String attrs = "m=0&iname=tid&t=20210513";
+ final String attrs = "m=0&" + TDMsgUtils.TDMSG_ATTR_STREAM_ID +
"=tid&t=20210513";
final String body1 = "tianqiwan|29";
tdMsg1.addMsg(attrs, body1.getBytes());
@@ -93,7 +94,7 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends
TestLogger {
assertEquals(new Timestamp(time),
collector.results.get(0).getRow().getField(0));
final Map<String, String> attributes = new HashMap<>();
attributes.put("m", "0");
- attributes.put("iname", "tid");
+ attributes.put(TDMsgUtils.TDMSG_ATTR_STREAM_ID, "tid");
attributes.put("t", "20210513");
assertEquals(attributes,
collector.results.get(0).getRow().getField(1));
assertEquals("tianqiwan",
collector.results.get(0).getRow().getField(2));
diff --git
a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
b/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
index 7746da8..33d715f 100644
---
a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
+++
b/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
@@ -52,9 +52,7 @@ public class TDMsgUtils {
public static final char TDMSG_ATTR_KV_DELIMITER = '=';
// keys in attributes
- public static final String TDMSG_ATTR_INTERFACE_NAME = "iname";
- public static final String TDMSG_ATTR_INTERFACE_ID = "id";
- public static final String TDMSG_ATTR_INTERFACE_TID = "tid";
+ public static final String TDMSG_ATTR_STREAM_ID = "streamId";
public static final String TDMSG_ATTR_TIME_T = "t";
public static final String TDMSG_ATTR_TIME_DT = "dt";
public static final String TDMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol";
diff --git
a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
b/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
index 88f32b2..f935e6d 100644
---
a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
+++
b/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
@@ -18,9 +18,7 @@
package org.apache.inlong.sort.formats.tdmsgcsv;
-import static
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_INTERFACE_ID;
-import static
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_INTERFACE_NAME;
-import static
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_INTERFACE_TID;
+import static
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_STREAM_ID;
import static
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_TIME_DT;
import static
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_TIME_T;
import static
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.getPredefinedFields;
@@ -59,19 +57,12 @@ public class TDMsgCsvUtils {
Map<String, String> attributes = parseAttr(attr);
// Extracts interface from the attributes.
- String tid;
-
- if (attributes.containsKey(TDMSG_ATTR_INTERFACE_NAME)) {
- tid = attributes.get(TDMSG_ATTR_INTERFACE_NAME);
- } else if (attributes.containsKey(TDMSG_ATTR_INTERFACE_ID)) {
- tid = attributes.get(TDMSG_ATTR_INTERFACE_ID);
- } else if (attributes.containsKey(TDMSG_ATTR_INTERFACE_TID)) {
- tid = attributes.get(TDMSG_ATTR_INTERFACE_TID);
+ String streamId;
+
+ if (attributes.containsKey(TDMSG_ATTR_STREAM_ID)) {
+ streamId = attributes.get(TDMSG_ATTR_STREAM_ID);
} else {
- throw new IllegalArgumentException(
- "Could not find " + TDMSG_ATTR_INTERFACE_NAME
- + " or " + TDMSG_ATTR_INTERFACE_ID
- + " or " + TDMSG_ATTR_INTERFACE_TID + " in
attributes!");
+ throw new IllegalArgumentException("Could not find " +
TDMSG_ATTR_STREAM_ID + " in attributes!");
}
// Extracts time from the attributes
@@ -92,7 +83,7 @@ public class TDMsgCsvUtils {
// Extracts predefined fields from the attributes
List<String> predefinedFields = getPredefinedFields(attributes);
- return new TDMsgHead(attributes, tid, time, predefinedFields);
+ return new TDMsgHead(attributes, streamId, time, predefinedFields);
}
public static TDMsgBody parseBody(
diff --git
a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
index 72d0575..91026f7 100644
---
a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.tdmsgcsv;
import static
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
import static
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_STREAM_ID;
import static org.junit.Assert.assertEquals;
import java.nio.charset.Charset;
@@ -98,7 +99,7 @@ public class TDMsgCsvFormatDeserializerTest {
new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs =
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID +
"=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
String body1 = "123,field11,field12,field13";
String body2 = "123,field21,field22,field23";
tdMsg1.addMsg(attrs, body1.getBytes());
@@ -106,7 +107,7 @@ public class TDMsgCsvFormatDeserializerTest {
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
@@ -147,7 +148,7 @@ public class TDMsgCsvFormatDeserializerTest {
new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs =
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID +
"=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
String body1 = "123,field11,field12,";
String body2 = "123,field21,,field23";
tdMsg1.addMsg(attrs, body1.getBytes());
@@ -155,7 +156,7 @@ public class TDMsgCsvFormatDeserializerTest {
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
@@ -196,7 +197,7 @@ public class TDMsgCsvFormatDeserializerTest {
new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&iname=testInterfaceId&t=20200322";
+ String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID +
"=testInterfaceId&t=20200322";
String body1 = "1,2,123,field11,field12,";
String body2 = "1,2,123,field21,,field23";
tdMsg1.addMsg(attrs, body1.getBytes());
@@ -204,7 +205,7 @@ public class TDMsgCsvFormatDeserializerTest {
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
Row expectedRow1 = Row.of(
@@ -281,7 +282,7 @@ public class TDMsgCsvFormatDeserializerTest {
);
TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs =
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID +
"=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
String body1 = "aaa,field11,field12,field13";
String body2 = "123,field21,field22,field23";
tdMsg1.addMsg(attrs, body1.getBytes());
@@ -289,7 +290,7 @@ public class TDMsgCsvFormatDeserializerTest {
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
@@ -329,14 +330,14 @@ public class TDMsgCsvFormatDeserializerTest {
);
TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&iname=testInterfaceId&t=20200322";
+ String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID +
"=testInterfaceId&t=20200322";
String body = ",1,2,3,field1,field2,field3";
tdMsg1.addMsg(attrs, body.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
Row expectedRow = Row.of(
@@ -374,14 +375,14 @@ public class TDMsgCsvFormatDeserializerTest {
);
TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&iname=testInterfaceId&t=20200322";
+ String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID +
"=testInterfaceId&t=20200322";
String body = ",1,2,field1,field2,field3";
tdMsg1.addMsg(attrs, body.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
Row expectedRow = Row.of(
@@ -408,7 +409,7 @@ public class TDMsgCsvFormatDeserializerTest {
new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs =
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID +
"=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
String body1 = "123,field11,field12";
String body2 = "123,field21,field22,field23,field24";
tdMsg1.addMsg(attrs, body1.getBytes());
@@ -416,7 +417,7 @@ public class TDMsgCsvFormatDeserializerTest {
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
@@ -456,14 +457,14 @@ public class TDMsgCsvFormatDeserializerTest {
new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
- String attrs = "m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&"
+ String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID +
"=testInterfaceId&t=20200322&__addcol1__=1&"
+
"__addcol2__=2&__addcol3__=3&__addcol4__=4&__addcol5__=5&__addcol6__=6&__addcol7__=7";
String body = "field11,field12";
tdMsg1.addMsg(attrs, body.getBytes());
Map<String, String> expectedAttributes = new HashMap<>();
expectedAttributes.put("m", "0");
- expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
expectedAttributes.put("t", "20200322");
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");