This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d1b611dc5 [INLONG-9626][Sort] Change the variable name from tid to 
streamId (#9629)
3d1b611dc5 is described below

commit 3d1b611dc510676323b02bfe7c28247759ed2569
Author: baomingyu <[email protected]>
AuthorDate: Wed Jan 31 17:22:07 2024 +0800

    [INLONG-9626][Sort] Change the variable name from tid to streamId (#9629)
---
 .../manager/pojo/source/tubemq/TubeMQSource.java     |  2 +-
 .../manager/pojo/source/tubemq/TubeMQSourceDTO.java  |  4 ++--
 .../pojo/source/tubemq/TubeMQSourceRequest.java      |  4 ++--
 .../sdk/dataproxy/example/UdpClientExample.java      |  2 +-
 .../deserialization/CsvDeserializationInfo.java      | 20 ++++++++++----------
 .../deserialization/DeserializationInfo.java         |  2 +-
 .../InLongMsgCsv2DeserializationInfo.java            |  8 ++++----
 .../InLongMsgCsvDeserializationInfo.java             | 12 ++++++------
 .../InLongMsgDeserializationInfo.java                | 12 ++++++------
 .../InLongMsgKvDeserializationInfo.java              |  8 ++++----
 .../InLongMsgTlogCsvDeserializationInfo.java         |  8 ++++----
 .../InLongMsgTlogKvDeserializationInfo.java          |  8 ++++----
 .../deserialization/KvDeserializationInfo.java       | 20 ++++++++++----------
 .../inlong/sort/tubemq/table/TubeMQOptions.java      |  2 +-
 .../inlong/sort/formats/inlongmsg/InLongMsgBody.java | 12 ++++++------
 .../inlong/sort/formats/inlongmsg/InLongMsgHead.java | 16 ++++++++--------
 .../sort/formats/inlongmsg/InLongMsgMetadata.java    |  2 +-
 .../AbstractInLongMsgMixedFormatConverter.java       |  6 +++---
 .../formats/inlongmsg/InLongMsgDecodingFormat.java   |  2 +-
 .../sort/formats/inlongmsg/InLongMsgUtils.java       | 10 ++++++----
 .../InLongMsgBinlogMixedFormatConverter.java         |  2 +-
 .../InLongMsgBinlogMixedFormatDeserializer.java      |  2 +-
 .../inlongmsgbinlog/InLongMsgBinlogUtils.java        |  8 ++++----
 .../InLongMsgCsvMixedFormatConverter.java            |  2 +-
 .../InLongMsgCsvMixedFormatDeserializer.java         |  2 +-
 .../sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java |  8 ++++----
 .../inlongmsgkv/InLongMsgKvMixedFormatConverter.java |  2 +-
 .../InLongMsgKvMixedFormatDeserializer.java          |  2 +-
 .../sort/formats/inlongmsgkv/InLongMsgKvUtils.java   |  8 ++++----
 .../InLongMsgTlogCsvMixedFormatConverter.java        |  2 +-
 .../InLongMsgTlogCsvMixedFormatDeserializer.java     |  2 +-
 .../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java      |  4 ++--
 .../InLongMsgTlogKvMixedFormatConverter.java         |  2 +-
 .../InLongMsgTlogKvMixedFormatDeserializer.java      |  2 +-
 .../inlongmsgtlogkv/InLongMsgTlogKvUtils.java        |  4 ++--
 .../AbstractInLongMsgMixedFormatConverter.java       |  6 +++---
 .../sort/formats/inlongmsg/InLongMsgUtils.java       | 11 +++++++----
 .../inlongmsgbinlog/InLongMsgBinlogUtils.java        | 10 +++++-----
 .../org/apache/flink/connectors/tubemq/Tubemq.java   | 16 ++++++++--------
 .../flink/connectors/tubemq/TubemqOptions.java       |  6 +++---
 .../flink/connectors/tubemq/TubemqSinkFunction.java  | 10 +++++-----
 .../connectors/tubemq/TubemqSourceFunction.java      | 16 ++++++++--------
 .../flink/connectors/tubemq/TubemqTableSource.java   | 16 ++++++++--------
 .../tubemq/TubemqTableSourceSinkFactory.java         | 14 +++++++-------
 .../flink/connectors/tubemq/TubemqValidator.java     |  8 ++++----
 .../apache/flink/connectors/tubemq/TubemqTest.java   |  4 ++--
 46 files changed, 167 insertions(+), 162 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
index 7714ba6229..a2539b4c98 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
@@ -65,7 +65,7 @@ public class TubeMQSource extends StreamSource {
     /**
      * The TubeMQ consumers use this streamId set to filter records reading 
from server.
      */
-    @ApiModelProperty("Tid of the TubeMQ")
+    @ApiModelProperty("StreamId of the TubeMQ")
     private TreeSet<String> streamId;
 
     @ApiModelProperty(value = "The message body wrap  wrap type, including: 
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
index 717e06c631..bae3b951bd 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
@@ -62,9 +62,9 @@ public class TubeMQSourceDTO {
     private String wrapType;
 
     /**
-     * The tubemq consumers use this tid set to filter records reading from 
server.
+     * The tubemq consumers use this streamId set to filter records reading 
from server.
      */
-    @ApiModelProperty("Tid of the TubeMQ")
+    @ApiModelProperty("streamId of the TubeMQ")
     private TreeSet<String> streamId;
 
     @ApiModelProperty("Properties for TubeMQ")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
index a584a9d16e..7ef7ca4c34 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
@@ -58,9 +58,9 @@ public class TubeMQSourceRequest extends SourceRequest {
     private String wrapType;
 
     /**
-     * The TubeMQ consumers use this tid set to filter records reading from 
server.
+     * The TubeMQ consumers use this streamId set to filter records reading 
from server.
      */
-    @ApiModelProperty("Tid of the TubeMQ")
+    @ApiModelProperty("streamId of the TubeMQ")
     private TreeSet<String> streamId;
 
     public TubeMQSourceRequest() {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
index 500b7f133f..26e490fc37 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
@@ -206,7 +206,7 @@ public class UdpClientExample {
                     if (Utils.isNotBlank(endAttr)) {
                         endAttr = endAttr + "&";
                     }
-                    endAttr = (endAttr + "bid=" + object.getGroupId() + "&tid="
+                    endAttr = (endAttr + "groupId=" + object.getGroupId() + 
"&streamId="
                             + object.getStreamId());
                 }
                 if (Utils.isNotBlank(object.getMsgUUID())) {
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
index abbd5c149b..e43a2f129f 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
@@ -39,27 +39,27 @@ public class CsvDeserializationInfo extends 
InLongMsgDeserializationInfo {
     @Nullable
     private final Character escapeChar;
 
-    private final String tid;
+    private final String streamId;
 
     // TODO: support mapping index to field
     public CsvDeserializationInfo(
             @JsonProperty("splitter") char splitter) {
-        this(TID_DEFAULT_VALUE, splitter, null);
+        this(STREAM_ID_DEFAULT_VALUE, splitter, null);
     }
 
     public CsvDeserializationInfo(
             @JsonProperty("splitter") char splitter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
-        this(TID_DEFAULT_VALUE, splitter, escapeChar);
+        this(STREAM_ID_DEFAULT_VALUE, splitter, escapeChar);
     }
 
     @JsonCreator
     public CsvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("splitter") char splitter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
-        super(tid);
-        this.tid = (StringUtils.isEmpty(tid) ? TID_DEFAULT_VALUE : tid);
+        super(streamId);
+        this.streamId = (StringUtils.isEmpty(streamId) ? 
STREAM_ID_DEFAULT_VALUE : streamId);
         this.splitter = splitter;
         this.escapeChar = escapeChar;
     }
@@ -75,9 +75,9 @@ public class CsvDeserializationInfo extends 
InLongMsgDeserializationInfo {
         return escapeChar;
     }
 
-    @JsonProperty("tid")
-    public String getTid() {
-        return tid;
+    @JsonProperty("streamId")
+    public String getStreamId() {
+        return streamId;
     }
 
     @Override
@@ -91,7 +91,7 @@ public class CsvDeserializationInfo extends 
InLongMsgDeserializationInfo {
         }
 
         CsvDeserializationInfo other = (CsvDeserializationInfo) o;
-        return Objects.equals(tid, other.getTid()) && splitter == 
other.splitter
+        return Objects.equals(streamId, other.getStreamId()) && splitter == 
other.splitter
                 && Objects.equals(escapeChar, other.escapeChar);
     }
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index dfe68472fd..56fa49312c 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -41,5 +41,5 @@ import java.io.Serializable;
 })
 public interface DeserializationInfo extends Serializable {
 
-    String TID_DEFAULT_VALUE = "-";
+    String STREAM_ID_DEFAULT_VALUE = "-";
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
index 7b45ed0026..0b8e45d241 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
@@ -39,17 +39,17 @@ public class InLongMsgCsv2DeserializationInfo extends 
InLongMsgDeserializationIn
     private final Character escapeChar;
 
     public InLongMsgCsv2DeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("delimiter") char delimiter) {
-        this(tid, delimiter, null);
+        this(streamId, delimiter, null);
     }
 
     @JsonCreator
     public InLongMsgCsv2DeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
-        super(tid);
+        super(streamId);
         this.delimiter = delimiter;
         this.escapeChar = escapeChar;
     }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
index 79ec62f9f7..d5817b4503 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
@@ -43,25 +43,25 @@ public class InLongMsgCsvDeserializationInfo extends 
InLongMsgDeserializationInf
     private final boolean deleteHeadDelimiter;
 
     public InLongMsgCsvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("delimiter") char delimiter) {
-        this(tid, delimiter, null, false);
+        this(streamId, delimiter, null, false);
     }
 
     public InLongMsgCsvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("delete_head_delimiter") boolean 
deleteHeadDelimiter) {
-        this(tid, delimiter, null, deleteHeadDelimiter);
+        this(streamId, delimiter, null, deleteHeadDelimiter);
     }
 
     @JsonCreator
     public InLongMsgCsvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar,
             @JsonProperty("delete_head_delimiter") boolean 
deleteHeadDelimiter) {
-        super(tid);
+        super(streamId);
         this.delimiter = delimiter;
         this.escapeChar = escapeChar;
         this.deleteHeadDelimiter = deleteHeadDelimiter;
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
index 6b86577e81..357a4fb886 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
@@ -28,14 +28,14 @@ public abstract class InLongMsgDeserializationInfo 
implements DeserializationInf
 
     private static final long serialVersionUID = 3707412713264864315L;
 
-    private final String tid;
+    private final String streamId;
 
-    public InLongMsgDeserializationInfo(@JsonProperty("tid") String tid) {
-        this.tid = checkNotNull(tid);
+    public InLongMsgDeserializationInfo(@JsonProperty("streamId") String 
streamId) {
+        this.streamId = checkNotNull(streamId);
     }
 
-    @JsonProperty("tid")
-    public String getTid() {
-        return tid;
+    @JsonProperty("streamId")
+    public String getStreamId() {
+        return streamId;
     }
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
index 1a5c8ff67e..99ff27f8ba 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
@@ -46,20 +46,20 @@ public class InLongMsgKvDeserializationInfo extends 
InLongMsgDeserializationInfo
     private final Character lineDelimiter;
 
     public InLongMsgKvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter) {
-        this(tid, entryDelimiter, kvDelimiter, null, null);
+        this(streamId, entryDelimiter, kvDelimiter, null, null);
     }
 
     @JsonCreator
     public InLongMsgKvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar,
             @JsonProperty("line_delimiter") @Nullable Character lineDelimiter) 
{
-        super(tid);
+        super(streamId);
         this.entryDelimiter = entryDelimiter;
         this.kvDelimiter = kvDelimiter;
         this.escapeChar = escapeChar;
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
index fbfb95c1fd..223a12d2b5 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
@@ -40,17 +40,17 @@ public class InLongMsgTlogCsvDeserializationInfo extends 
InLongMsgDeserializatio
     private final Character escapeChar;
 
     public InLongMsgTlogCsvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("delimiter") char delimiter) {
-        this(tid, delimiter, null);
+        this(streamId, delimiter, null);
     }
 
     @JsonCreator
     public InLongMsgTlogCsvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
-        super(tid);
+        super(streamId);
         this.delimiter = delimiter;
         this.escapeChar = escapeChar;
     }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
index f3b2985128..77ad77ac82 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
@@ -44,21 +44,21 @@ public class InLongMsgTlogKvDeserializationInfo extends 
InLongMsgDeserialization
     private final Character escapeChar;
 
     public InLongMsgTlogKvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter) {
-        this(tid, delimiter, entryDelimiter, kvDelimiter, null);
+        this(streamId, delimiter, entryDelimiter, kvDelimiter, null);
     }
 
     @JsonCreator
     public InLongMsgTlogKvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
-        super(tid);
+        super(streamId);
         this.delimiter = delimiter;
         this.entryDelimiter = entryDelimiter;
         this.kvDelimiter = kvDelimiter;
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
index c15839f50b..583316c783 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
@@ -37,7 +37,7 @@ public class KvDeserializationInfo extends 
InLongMsgDeserializationInfo {
 
     private final char kvSplitter;
 
-    private final String tid;
+    private final String streamId;
 
     @JsonInclude(JsonInclude.Include.NON_NULL)
     @Nullable
@@ -46,24 +46,24 @@ public class KvDeserializationInfo extends 
InLongMsgDeserializationInfo {
     public KvDeserializationInfo(
             @JsonProperty("entry_splitter") char entrySplitter,
             @JsonProperty("kv_splitter") char kvSplitter) {
-        this(TID_DEFAULT_VALUE, entrySplitter, kvSplitter, null);
+        this(STREAM_ID_DEFAULT_VALUE, entrySplitter, kvSplitter, null);
     }
 
     public KvDeserializationInfo(
             @JsonProperty("entry_splitter") char entrySplitter,
             @JsonProperty("kv_splitter") char kvSplitter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
-        this(TID_DEFAULT_VALUE, entrySplitter, kvSplitter, escapeChar);
+        this(STREAM_ID_DEFAULT_VALUE, entrySplitter, kvSplitter, escapeChar);
     }
 
     @JsonCreator
     public KvDeserializationInfo(
-            @JsonProperty("tid") String tid,
+            @JsonProperty("streamId") String streamId,
             @JsonProperty("entry_splitter") char entrySplitter,
             @JsonProperty("kv_splitter") char kvSplitter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
-        super(tid);
-        this.tid = (StringUtils.isEmpty(tid) ? TID_DEFAULT_VALUE : tid);
+        super(streamId);
+        this.streamId = (StringUtils.isEmpty(streamId) ? 
STREAM_ID_DEFAULT_VALUE : streamId);
         this.entrySplitter = entrySplitter;
         this.kvSplitter = kvSplitter;
         this.escapeChar = escapeChar;
@@ -85,9 +85,9 @@ public class KvDeserializationInfo extends 
InLongMsgDeserializationInfo {
         return escapeChar;
     }
 
-    @JsonProperty("tid")
-    public String getTid() {
-        return tid;
+    @JsonProperty("streamId")
+    public String getStreamId() {
+        return streamId;
     }
 
     @Override
@@ -101,7 +101,7 @@ public class KvDeserializationInfo extends 
InLongMsgDeserializationInfo {
         }
 
         KvDeserializationInfo other = (KvDeserializationInfo) o;
-        return Objects.equals(tid, other.getTid()) && entrySplitter == 
other.entrySplitter
+        return Objects.equals(streamId, other.getStreamId()) && entrySplitter 
== other.entrySplitter
                 && kvSplitter == other.kvSplitter
                 && Objects.equals(escapeChar, other.escapeChar);
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
index 357da2dd09..dd4ef5b6f2 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
@@ -152,7 +152,7 @@ public class TubeMQOptions {
                     .stringType()
                     .asList()
                     .noDefaultValue()
-                    .withDescription("The tid owned this topic.");
+                    .withDescription("The streamId owned this topic.");
 
     public static final ConfigOption<Integer> MAX_RETRIES =
             ConfigOptions.key("max.retries")
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
index f0860b1b9c..77a7812b22 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
@@ -37,7 +37,7 @@ public class InLongMsgBody implements Serializable {
     /**
      * The interface of the record.
      */
-    private final String tid;
+    private final String streamId;
 
     /**
      * The fields extracted from the body.
@@ -51,11 +51,11 @@ public class InLongMsgBody implements Serializable {
 
     public InLongMsgBody(
             byte[] data,
-            String tid,
+            String streamId,
             List<String> fields,
             Map<String, String> entries) {
         this.data = data;
-        this.tid = tid;
+        this.streamId = streamId;
         this.fields = fields;
         this.entries = entries;
     }
@@ -64,8 +64,8 @@ public class InLongMsgBody implements Serializable {
         return data;
     }
 
-    public String getTid() {
-        return tid;
+    public String getStreamId() {
+        return streamId;
     }
 
     public List<String> getFields() {
@@ -97,7 +97,7 @@ public class InLongMsgBody implements Serializable {
 
     @Override
     public String toString() {
-        return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ", tid='" 
+ tid + '\''
+        return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ", 
streamId='" + streamId + '\''
                 + ", fields=" + fields + ", entries=" + entries + '}';
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
index 61e6f6234f..e651c023dd 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
@@ -40,7 +40,7 @@ public class InLongMsgHead implements Serializable {
     /**
      * The interface of the record.
      */
-    private final String tid;
+    private final String streamId;
 
     /**
      * The time of the record.
@@ -54,11 +54,11 @@ public class InLongMsgHead implements Serializable {
 
     public InLongMsgHead(
             Map<String, String> attributes,
-            String tid,
+            String streamId,
             Timestamp time,
             List<String> predefinedFields) {
         this.attributes = attributes;
-        this.tid = tid;
+        this.streamId = streamId;
         this.time = time;
         this.predefinedFields = predefinedFields;
     }
@@ -67,8 +67,8 @@ public class InLongMsgHead implements Serializable {
         return attributes;
     }
 
-    public String getTid() {
-        return tid;
+    public String getStreamId() {
+        return streamId;
     }
 
     public Timestamp getTime() {
@@ -91,21 +91,21 @@ public class InLongMsgHead implements Serializable {
 
         InLongMsgHead that = (InLongMsgHead) o;
         return Objects.equals(attributes, that.attributes)
-                && Objects.equals(tid, that.tid)
+                && Objects.equals(streamId, that.streamId)
                 && Objects.equals(time, that.time)
                 && Objects.equals(predefinedFields, that.predefinedFields);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(attributes, tid, time, predefinedFields);
+        return Objects.hash(attributes, streamId, time, predefinedFields);
     }
 
     @Override
     public String toString() {
         return "InLongMsgHead{"
                 + "attributes=" + attributes
-                + ", tid='" + tid + '\''
+                + ", streamId='" + streamId + '\''
                 + ", time=" + time
                 + ", predefinedFields=" + predefinedFields
                 + '}';
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMetadata.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMetadata.java
index f5a1eca427..6e31220e12 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMetadata.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMetadata.java
@@ -30,7 +30,7 @@ public class InLongMsgMetadata {
      */
     public enum ReadableMetadata {
 
-        TID("metadata-tid", DataTypes.STRING());
+        STREAMID("metadata-streamId", DataTypes.STRING());
 
         final String key;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
index fa15b20127..165f6532fb 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
@@ -52,7 +52,7 @@ public abstract class AbstractInLongMsgMixedFormatConverter
     public abstract List<Row> convertRows(
             Map<String, String> attributes,
             byte[] data,
-            String tid,
+            String streamId,
             Timestamp time,
             List<String> predefinedFields,
             List<String> fields,
@@ -66,14 +66,14 @@ public abstract class AbstractInLongMsgMixedFormatConverter
         try {
             Map<String, String> attributes = 
InLongMsgUtils.getAttributesFromMixedRow(row);
             byte[] data = InLongMsgUtils.getDataFromMixedRow(row);
-            String tid = InLongMsgUtils.getTidFromMixedRow(row);
+            String streamId = InLongMsgUtils.getStreamIdFromMixedRow(row);
             Timestamp time = InLongMsgUtils.getTimeFromMixedRow(row);
             List<String> predefinedFields = 
InLongMsgUtils.getPredefinedFieldsFromMixedRow(row);
             List<String> fields = InLongMsgUtils.getFieldsFromMixedRow(row);
             Map<String, String> entries = 
InLongMsgUtils.getEntriesFromMixedRow(row);
 
             convertedRows =
-                    convertRows(attributes, data, tid, time, predefinedFields, 
fields, entries);
+                    convertRows(attributes, data, streamId, time, 
predefinedFields, fields, entries);
         } catch (Throwable t) {
             String errorMessage =
                     String.format("Could not properly convert the mixed row. 
Row=[%s].", row);
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
index 93e970aa85..779ff6b8e8 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
@@ -206,7 +206,7 @@ public class InLongMsgDecodingFormat implements 
DecodingFormat<DeserializationSc
 
                     @Override
                     public Object read(InLongMsgHead head) {
-                        return StringData.fromString(head.getTid());
+                        return StringData.fromString(head.getStreamId());
                     }
                 });
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index 969b7fc19e..131c8099ec 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -62,6 +62,8 @@ public class InLongMsgUtils {
 
     // keys in attributes
     public static final String INLONGMSG_ATTR_STREAM_ID = "streamId";
+
+    @Deprecated
     public static final String INLONGMSG_ATTR_TID = "tid";
     public static final String INLONGMSG_ATTR_TIME_T = "t";
     public static final String INLONGMSG_ATTR_TIME_DT = "dt";
@@ -80,7 +82,7 @@ public class InLongMsgUtils {
                     new String[]{
                             "attributes",
                             "data",
-                            "tid",
+                            "streamId",
                             "time",
                             "predefinedFields",
                             "fields",
@@ -254,11 +256,11 @@ public class InLongMsgUtils {
     public static Row buildMixedRow(
             InLongMsgHead head,
             InLongMsgBody body,
-            String tid) {
+            String streamId) {
         Row row = new Row(7);
         row.setField(0, head.getAttributes());
         row.setField(1, body.getData());
-        row.setField(2, tid);
+        row.setField(2, streamId);
         row.setField(3, head.getTime());
         row.setField(4, head.getPredefinedFields());
         row.setField(5, body.getFields());
@@ -276,7 +278,7 @@ public class InLongMsgUtils {
         return (byte[]) row.getField(1);
     }
 
-    public static String getTidFromMixedRow(Row row) {
+    public static String getStreamIdFromMixedRow(Row row) {
         return (String) row.getField(2);
     }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
index a5d5f8c5d1..e92b2b1217 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
@@ -111,7 +111,7 @@ public class InLongMsgBinlogMixedFormatConverter extends 
AbstractInLongMsgMixedF
     public List<Row> convertRows(
             Map<String, String> attributes,
             byte[] data,
-            String tid,
+            String streamId,
             Timestamp time,
             List<String> predefinedFields,
             List<String> fields,
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
index 9e09b6c037..59d4959991 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
@@ -68,7 +68,7 @@ public class InLongMsgBinlogMixedFormatDeserializer extends 
AbstractInLongMsgMix
 
     @Override
     protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) 
throws IOException {
-        Row row = InLongMsgUtils.buildMixedRow(head, body, head.getTid());
+        Row row = InLongMsgUtils.buildMixedRow(head, body, head.getStreamId());
         return Collections.singletonList(row);
     }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 76ad558ea8..e9bb0a5a5d 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -112,11 +112,11 @@ public class InLongMsgBinlogUtils {
         Map<String, String> attributes = parseAttr(attr);
 
         // Extracts interface from the attributes.
-        String tid;
+        String streamId;
         if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
-            tid = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+            streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
         } else if (attributes.containsKey(INLONGMSG_ATTR_TID)) {
-            tid = attributes.get(INLONGMSG_ATTR_TID);
+            streamId = attributes.get(INLONGMSG_ATTR_TID);
         } else {
             throw new IllegalArgumentException(
                     "Could not find " + INLONGMSG_ATTR_STREAM_ID
@@ -127,7 +127,7 @@ public class InLongMsgBinlogUtils {
         Timestamp time = 
parseEpochTime(Long.toString(System.currentTimeMillis()));
         List<String> predefinedFields = getPredefinedFields(attributes);
 
-        return new InLongMsgHead(attributes, tid, time, predefinedFields);
+        return new InLongMsgHead(attributes, streamId, time, predefinedFields);
     }
 
     public static InLongMsgBody parseBody(byte[] bytes) {
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
index f7add9121a..c0d4d1de62 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
@@ -92,7 +92,7 @@ public class InLongMsgCsvMixedFormatConverter extends 
AbstractInLongMsgMixedForm
     public List<Row> convertRows(
             Map<String, String> attributes,
             byte[] data,
-            String tid,
+            String streamId,
             Timestamp time,
             List<String> predefinedFields,
             List<String> fields,
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
index a0f1522ab9..194ca2fa85 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
@@ -138,7 +138,7 @@ public final class InLongMsgCsvMixedFormatDeserializer 
extends AbstractInLongMsg
 
     @Override
     protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) {
-        Row row = InLongMsgUtils.buildMixedRow(head, body, head.getTid());
+        Row row = InLongMsgUtils.buildMixedRow(head, body, head.getStreamId());
         return Collections.singletonList(row);
     }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index ac17f3a444..cad6ae9dc0 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -59,12 +59,12 @@ public class InLongMsgCsvUtils {
         Map<String, String> attributes = parseAttr(attr);
 
         // Extracts interface from the attributes.
-        String tid;
+        String streamId;
 
         if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
-            tid = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+            streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
         } else if (attributes.containsKey(INLONGMSG_ATTR_TID)) {
-            tid = attributes.get(INLONGMSG_ATTR_TID);
+            streamId = attributes.get(INLONGMSG_ATTR_TID);
         } else {
             throw new IllegalArgumentException(
                     "Could not find " + INLONGMSG_ATTR_STREAM_ID
@@ -90,7 +90,7 @@ public class InLongMsgCsvUtils {
         // Extracts predefined fields from the attributes
         List<String> predefinedFields = getPredefinedFields(attributes);
 
-        return new InLongMsgHead(attributes, tid, time, predefinedFields);
+        return new InLongMsgHead(attributes, streamId, time, predefinedFields);
     }
 
     public static List<InLongMsgBody> parseBodyList(
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
index a46fa78459..efc1c7642c 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
@@ -91,7 +91,7 @@ public class InLongMsgKvMixedFormatConverter extends 
AbstractInLongMsgMixedForma
     public List<Row> convertRows(
             Map<String, String> attributes,
             byte[] data,
-            String tid,
+            String streamId,
             Timestamp time,
             List<String> predefinedFields,
             List<String> fields,
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
index 34e67780a3..cceefd517e 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
@@ -148,7 +148,7 @@ public final class InLongMsgKvMixedFormatDeserializer
 
     @Override
     protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) {
-        Row row = InLongMsgUtils.buildMixedRow(head, body, head.getTid());
+        Row row = InLongMsgUtils.buildMixedRow(head, body, head.getStreamId());
         return Collections.singletonList(row);
     }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index 9964429c2e..b25115fdfe 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -52,11 +52,11 @@ public class InLongMsgKvUtils {
     public static InLongMsgHead parseHead(String attr) {
         Map<String, String> attributes = parseAttr(attr);
 
-        String tid;
+        String streamId;
         if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
-            tid = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+            streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
         } else if (attributes.containsKey(INLONGMSG_ATTR_TID)) {
-            tid = attributes.get(INLONGMSG_ATTR_TID);
+            streamId = attributes.get(INLONGMSG_ATTR_TID);
         } else {
             throw new IllegalArgumentException(
                     "Could not find " + INLONGMSG_ATTR_STREAM_ID +
@@ -78,7 +78,7 @@ public class InLongMsgKvUtils {
 
         List<String> predefinedFields = getPredefinedFields(attributes);
 
-        return new InLongMsgHead(attributes, tid, time, predefinedFields);
+        return new InLongMsgHead(attributes, streamId, time, predefinedFields);
     }
 
     public static List<InLongMsgBody> parseBodyList(
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
index da210ed2ce..d97bb341da 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
@@ -93,7 +93,7 @@ public class InLongMsgTlogCsvMixedFormatConverter extends 
AbstractInLongMsgMixed
     public List<Row> convertRows(
             Map<String, String> attributes,
             byte[] data,
-            String tid,
+            String streamId,
             Timestamp time,
             List<String> predefinedFields,
             List<String> fields,
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
index ca0f0b9f86..46723f058c 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
@@ -110,7 +110,7 @@ public final class InLongMsgTlogCsvMixedFormatDeserializer
 
     @Override
     protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) 
throws Exception {
-        Row row = InLongMsgUtils.buildMixedRow(head, body, body.getTid());
+        Row row = InLongMsgUtils.buildMixedRow(head, body, body.getStreamId());
         return Collections.singletonList(row);
     }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index 8a5ef167a2..6683e08f5e 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -89,11 +89,11 @@ public class InLongMsgTlogCsvUtils {
 
         String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
 
-        String tid = segments[0];
+        String streamId = segments[0];
         List<String> fields =
                 Arrays.stream(segments, 1, 
segments.length).collect(Collectors.toList());
 
-        return new InLongMsgBody(bytes, tid, fields, Collections.emptyMap());
+        return new InLongMsgBody(bytes, streamId, fields, 
Collections.emptyMap());
     }
 
     /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
index 2892dfc755..560d571685 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
@@ -93,7 +93,7 @@ public class InLongMsgTlogKvMixedFormatConverter extends 
AbstractInLongMsgMixedF
     public List<Row> convertRows(
             Map<String, String> attributes,
             byte[] data,
-            String tid,
+            String streamId,
             Timestamp time,
             List<String> predefinedFields,
             List<String> fields,
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
index bccba0276f..84a6823e35 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
@@ -143,7 +143,7 @@ public final class InLongMsgTlogKvMixedFormatDeserializer
 
     @Override
     protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) 
throws Exception {
-        Row row = InLongMsgUtils.buildMixedRow(head, body, body.getTid());
+        Row row = InLongMsgUtils.buildMixedRow(head, body, body.getStreamId());
         return Collections.singletonList(row);
     }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index a039079304..8628d6cb79 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -85,7 +85,7 @@ public class InLongMsgTlogKvUtils {
 
         String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
 
-        String tid = segments[0];
+        String streamId = segments[0];
 
         Map<String, String> entries;
         if (segments.length > 1) {
@@ -94,7 +94,7 @@ public class InLongMsgTlogKvUtils {
             entries = Collections.emptyMap();
         }
 
-        return new InLongMsgBody(bytes, tid, Collections.emptyList(), entries);
+        return new InLongMsgBody(bytes, streamId, Collections.emptyList(), 
entries);
     }
 
     /**
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
index fb56e6956a..87168cb15b 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
@@ -52,7 +52,7 @@ public abstract class AbstractInLongMsgMixedFormatConverter
     public abstract List<RowData> convertRowDatas(
             Map<String, String> attributes,
             byte[] data,
-            String tid,
+            String streamId,
             Timestamp time,
             List<String> predefinedFields,
             List<String> fields,
@@ -66,14 +66,14 @@ public abstract class AbstractInLongMsgMixedFormatConverter
         try {
             Map<String, String> attributes = 
InLongMsgUtils.getAttributesFromMixedRowData(rowData);
             byte[] data = InLongMsgUtils.getDataFromMixedRowData(rowData);
-            String tid = InLongMsgUtils.getTidFromMixedRowData(rowData);
+            String streamId = InLongMsgUtils.getTidFromMixedRowData(rowData);
             Timestamp time = InLongMsgUtils.getTimeFromMixedRowData(rowData);
             List<String> predefinedFields = 
InLongMsgUtils.getPredefinedFieldsFromMixedRowData(rowData);
             List<String> fields = 
InLongMsgUtils.getFieldsFromMixedRowData(rowData);
             Map<String, String> entries = 
InLongMsgUtils.getEntriesFromMixedRowData(rowData);
 
             convertedRowDatas =
-                    convertRowDatas(attributes, data, tid, time, 
predefinedFields, fields, entries);
+                    convertRowDatas(attributes, data, streamId, time, 
predefinedFields, fields, entries);
         } catch (Throwable t) {
             String errorMessage =
                     String.format("Could not properly convert the mixed row. 
Row=[%s].", rowData);
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index 50b0c0e0b0..ce0b5c9dbe 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -51,7 +51,7 @@ import java.util.stream.Stream;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ATTRIBUTE_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.validateSchema;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMetadata.ReadableMetadata.TID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMetadata.ReadableMetadata.STREAMID;
 import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
 
 /**
@@ -63,8 +63,11 @@ public class InLongMsgUtils {
     public static final char INLONGMSG_ATTR_KV_DELIMITER = '=';
 
     // keys in attributes
+    @Deprecated
     public static final String INLONGMSG_ATTR_INTERFACE_NAME = "iname";
+    @Deprecated
     public static final String INLONGMSG_ATTR_INTERFACE_ID = "id";
+    @Deprecated
     public static final String INLONGMSG_ATTR_INTERFACE_TID = "tid";
     public static final String INLONGMSG_ATTR_STREAMID = "streamId";
     public static final String INLONGMSG_ATTR_TIME_T = "t";
@@ -85,7 +88,7 @@ public class InLongMsgUtils {
         String[] fieldNames = new String[]{
                 "attributes",
                 "data",
-                "tid",
+                "streamId",
                 "time",
                 "predefinedFields",
                 "fields",
@@ -357,8 +360,8 @@ public class InLongMsgUtils {
         }
 
         for (int j = 0; j < metadataKeys.size(); j++) {
-            if (metadataKeys.get(j).equals(TID.getKey())) {
-                producedRow.setField(physicalArity + j, 
StringData.fromString(head.getTid()));
+            if (metadataKeys.get(j).equals(STREAMID.getKey())) {
+                producedRow.setField(physicalArity + j, 
StringData.fromString(head.getStreamId()));
             }
         }
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 50b82493b8..6357482946 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -72,13 +72,13 @@ public class InLongMsgBinlogUtils {
         Map<String, String> attributes = parseAttr(attr);
 
         // Extracts interface from the attributes.
-        String tid;
+        String streamId;
         if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_NAME)) {
-            tid = attributes.get(INLONGMSG_ATTR_INTERFACE_NAME);
+            streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_NAME);
         } else if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_ID)) {
-            tid = attributes.get(INLONGMSG_ATTR_INTERFACE_ID);
+            streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_ID);
         } else if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_TID)) {
-            tid = attributes.get(INLONGMSG_ATTR_INTERFACE_TID);
+            streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_TID);
         } else {
             throw new IllegalArgumentException(
                     "Could not find " + INLONGMSG_ATTR_INTERFACE_NAME +
@@ -90,7 +90,7 @@ public class InLongMsgBinlogUtils {
         Timestamp time = 
parseEpochTime(Long.toString(System.currentTimeMillis()));
         List<String> predefinedFields = getPredefinedFields(attributes);
 
-        return new InLongMsgHead(attributes, tid, time, predefinedFields);
+        return new InLongMsgHead(attributes, streamId, time, predefinedFields);
     }
 
     public static InLongMsgBody parseBody(byte[] bytes) {
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
index 5a5fc39ad8..2e34c13330 100644
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
+++ 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_GROUP;
 import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_MASTER;
 import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_PROPERTIES;
-import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TIDS;
+import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_STREAMIDS;
 import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TOPIC;
 import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TYPE_VALUE_TUBEMQ;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -52,7 +52,7 @@ public class Tubemq extends ConnectorDescriptor {
     private String group;
 
     @Nullable
-    private String tids;
+    private String streamIds;
 
     @Nonnull
     private Map<String, String> properties;
@@ -110,13 +110,13 @@ public class Tubemq extends ConnectorDescriptor {
     }
 
     /**
-     * The tubemq consumers use these tids to filter records reading from 
server.
+     * The tubemq consumers use these streamIds to filter records reading from 
server.
      *
-     * @param tids The filter for consume record from server.
+     * @param streamIds The filter for consume record from server.
      */
-    public Tubemq tids(String tids) {
+    public Tubemq streamIds(String streamIds) {
 
-        this.tids = tids;
+        this.streamIds = streamIds;
         return this;
     }
 
@@ -150,8 +150,8 @@ public class Tubemq extends ConnectorDescriptor {
                 descriptorProperties.putString(CONNECTOR_GROUP, group);
             }
 
-            if (tids != null) {
-                descriptorProperties.putString(CONNECTOR_TIDS, tids);
+            if (streamIds != null) {
+                descriptorProperties.putString(CONNECTOR_STREAMIDS, streamIds);
             }
         }
 
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
index a827ae9444..e94fbf86c5 100644
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
+++ 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
@@ -30,10 +30,10 @@ public class TubemqOptions {
                     .noDefaultValue()
                     .withDescription("The session key for this consumer group 
at startup.");
 
-    public static final ConfigOption<String> TID =
-            ConfigOptions.key("topic.tid")
+    public static final ConfigOption<String> STREAM_ID =
+            ConfigOptions.key("topic.streamId")
                     .noDefaultValue()
-                    .withDescription("The tid owned this topic.");
+                    .withDescription("The streamId owned this topic.");
 
     public static final ConfigOption<Integer> MAX_RETRIES =
             ConfigOptions.key("max.retries")
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
index cf8c042649..fabd43af6e 100644
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
+++ 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
@@ -60,9 +60,9 @@ public class TubemqSinkFunction<T> extends 
RichSinkFunction<T> implements Checkp
     private final String topic;
 
     /**
-     * The tid of this topic
+     * The streamId of this topic
      */
-    private final String tid;
+    private final String streamId;
     /**
      * The serializer for the records sent to pulsar.
      */
@@ -99,7 +99,7 @@ public class TubemqSinkFunction<T> extends 
RichSinkFunction<T> implements Checkp
         this.topic = topic;
         this.masterAddress = masterAddress;
         this.serializationSchema = serializationSchema;
-        this.tid = configuration.getString(TubemqOptions.TID);
+        this.streamId = configuration.getString(TubemqOptions.STREAM_ID);
         this.maxRetries = configuration.getInteger(MAX_RETRIES);
     }
 
@@ -136,10 +136,10 @@ public class TubemqSinkFunction<T> extends 
RichSinkFunction<T> implements Checkp
             try {
                 byte[] body = serializationSchema.serialize(in);
                 Message message = new Message(topic, body);
-                if (StringUtils.isNotBlank(tid)) {
+                if (StringUtils.isNotBlank(streamId)) {
                     SimpleDateFormat sdf = new 
SimpleDateFormat(SYSTEM_HEADER_TIME_FORMAT);
                     long currTimeMillis = System.currentTimeMillis();
-                    message.putSystemHeader(tid, sdf.format(new 
Date(currTimeMillis)));
+                    message.putSystemHeader(streamId, sdf.format(new 
Date(currTimeMillis)));
                 }
 
                 MessageSentResult sendResult = producer.sendMessage(message);
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
index ab15a835c5..4f9fc050a9 100644
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
+++ 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
@@ -82,9 +82,9 @@ public class TubemqSourceFunction<T>
     private final String topic;
 
     /**
-     * The tubemq consumers use this tid set to filter records reading from 
server.
+     * The tubemq consumers use this streamId set to filter records reading 
from server.
      */
-    private final TreeSet<String> tidSet;
+    private final TreeSet<String> streamIdSet;
 
     /**
      * The consumer group name.
@@ -150,7 +150,7 @@ public class TubemqSourceFunction<T>
      *
      * @param masterAddress            the master address of TubeMQ
      * @param topic                    the topic name
-     * @param tidSet                   the  topic's filter condition items
+     * @param streamIdSet                   the  topic's filter condition items
      * @param consumerGroup            the consumer group name
      * @param deserializationSchema    the deserialize schema
      * @param configuration            the configure
@@ -158,7 +158,7 @@ public class TubemqSourceFunction<T>
     public TubemqSourceFunction(
             String masterAddress,
             String topic,
-            TreeSet<String> tidSet,
+            TreeSet<String> streamIdSet,
             String consumerGroup,
             DeserializationSchema<T> deserializationSchema,
             Configuration configuration) {
@@ -166,8 +166,8 @@ public class TubemqSourceFunction<T>
                 "The master address must not be null.");
         checkNotNull(topic,
                 "The topic must not be null.");
-        checkNotNull(tidSet,
-                "The tid set must not be null.");
+        checkNotNull(streamIdSet,
+                "The streamId set must not be null.");
         checkNotNull(consumerGroup,
                 "The consumer group must not be null.");
         checkNotNull(deserializationSchema,
@@ -177,7 +177,7 @@ public class TubemqSourceFunction<T>
 
         this.masterAddress = masterAddress;
         this.topic = topic;
-        this.tidSet = tidSet;
+        this.streamIdSet = streamIdSet;
         this.consumerGroup = consumerGroup;
         this.deserializationSchema = deserializationSchema;
 
@@ -235,7 +235,7 @@ public class TubemqSourceFunction<T>
         messagePullConsumer =
                 messageSessionFactory.createPullConsumer(consumerConfig);
         messagePullConsumer
-                .subscribe(topic, tidSet);
+                .subscribe(topic, streamIdSet);
         messagePullConsumer
                 .completeSubscribe(sessionKey, numTasks, true, currentOffsets);
 
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
index 79520df593..934a441687 100644
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
+++ 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
@@ -90,9 +90,9 @@ public class TubemqTableSource
     private final String topic;
 
     /**
-     * The TubeMQ tid filter collection.
+     * The TubeMQ streamId filter collection.
      */
-    private final TreeSet<String> tidSet;
+    private final TreeSet<String> streamIdSet;
 
     /**
      * The TubeMQ consumer group name.
@@ -114,7 +114,7 @@ public class TubemqTableSource
      * @param fieldMapping        the field map information
      * @param masterAddress       the master address
      * @param topic               the topic name
-     * @param tidSet              the topic's filter condition items
+     * @param streamIdSet              the topic's filter condition items
      * @param consumerGroup       the consumer group
      * @param configuration       the configure
      */
@@ -126,7 +126,7 @@ public class TubemqTableSource
             Map<String, String> fieldMapping,
             String masterAddress,
             String topic,
-            TreeSet<String> tidSet,
+            TreeSet<String> streamIdSet,
             String consumerGroup,
             Configuration configuration) {
         checkNotNull(deserializationSchema,
@@ -139,8 +139,8 @@ public class TubemqTableSource
                 "The master address must not be null.");
         checkNotNull(topic,
                 "The topic must not be null.");
-        checkNotNull(tidSet,
-                "The tid set must not be null.");
+        checkNotNull(streamIdSet,
+                "The streamId set must not be null.");
         checkNotNull(consumerGroup,
                 "The consumer group must not be null.");
         checkNotNull(configuration,
@@ -151,7 +151,7 @@ public class TubemqTableSource
         this.fieldMapping = fieldMapping;
         this.masterAddress = masterAddress;
         this.topic = topic;
-        this.tidSet = tidSet;
+        this.streamIdSet = streamIdSet;
         this.consumerGroup = consumerGroup;
         this.configuration = configuration;
 
@@ -189,7 +189,7 @@ public class TubemqTableSource
                 new TubemqSourceFunction<>(
                         masterAddress,
                         topic,
-                        tidSet,
+                        streamIdSet,
                         consumerGroup,
                         deserializationSchema,
                         configuration);
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
index 60efb5a117..e28e85c1b7 100644
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
+++ 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
@@ -92,7 +92,7 @@ public class TubemqTableSourceSinkFactory
         properties.add(TubemqValidator.CONNECTOR_TOPIC);
         properties.add(TubemqValidator.CONNECTOR_MASTER);
         properties.add(TubemqValidator.CONNECTOR_GROUP);
-        properties.add(TubemqValidator.CONNECTOR_TIDS);
+        properties.add(TubemqValidator.CONNECTOR_STREAMIDS);
         properties.add(TubemqValidator.CONNECTOR_PROPERTIES + ".*");
 
         // schema
@@ -145,16 +145,16 @@ public class TubemqTableSourceSinkFactory
                 
descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER);
         final String consumerGroup =
                 
descriptorProperties.getString(TubemqValidator.CONNECTOR_GROUP);
-        final String tids =
+        final String streamIds =
                 descriptorProperties
-                        .getOptionalString(TubemqValidator.CONNECTOR_TIDS)
+                        .getOptionalString(TubemqValidator.CONNECTOR_STREAMIDS)
                         .orElse(null);
         final Configuration configuration =
                 getConfiguration(descriptorProperties);
 
-        TreeSet<String> tidSet = new TreeSet<>();
-        if (tids != null) {
-            tidSet.addAll(Arrays.asList(tids.split(SPLIT_COMMA)));
+        TreeSet<String> streamIdSet = new TreeSet<>();
+        if (streamIds != null) {
+            streamIdSet.addAll(Arrays.asList(streamIds.split(SPLIT_COMMA)));
         }
 
         return new TubemqTableSource(
@@ -165,7 +165,7 @@ public class TubemqTableSourceSinkFactory
                 fieldMapping,
                 masterAddress,
                 topic,
-                tidSet,
+                streamIdSet,
                 consumerGroup,
                 configuration);
     }
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
index 1373c99d37..bddaec4eea 100644
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
+++ 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
@@ -46,9 +46,9 @@ public class TubemqValidator extends 
ConnectorDescriptorValidator {
     public static final String CONNECTOR_GROUP = "connector.group";
 
     /**
-     * The tubemq consumers use these tids to filter records reading from 
server.
+     * The tubemq consumers use these streamIds to filter records reading from 
server.
      */
-    public static final String CONNECTOR_TIDS = "connector.tids";
+    public static final String CONNECTOR_STREAMIDS = "connector.stream-ids";
 
     /**
      * The prefix of tubemq properties (optional).
@@ -71,7 +71,7 @@ public class TubemqValidator extends 
ConnectorDescriptorValidator {
         // Validate that the group name is set.
         properties.validateString(CONNECTOR_GROUP, false, 1, 
Integer.MAX_VALUE);
 
-        // Validate that the tids is set.
-        properties.validateString(CONNECTOR_TIDS, true, 1, Integer.MAX_VALUE);
+        // Validate that the streamIds is set.
+        properties.validateString(CONNECTOR_STREAMIDS, true, 1, 
Integer.MAX_VALUE);
     }
 }
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
index dc046e171a..7f0e68def3 100644
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
+++ 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
@@ -53,7 +53,7 @@ public class TubemqTest extends DescriptorTestBase {
                         .topic("test-topic-3")
                         .master("localhost:9001")
                         .group("test-group-3")
-                        .tids("test-tid-1,test-tid-2");
+                        .streamIds("test-streamId-1,test-streamId-2");
 
         return Arrays.asList(descriptor1, descriptor2, descriptor3);
     }
@@ -80,7 +80,7 @@ public class TubemqTest extends DescriptorTestBase {
         props3.put("connector.type", "tubemq");
         props3.put("connector.master", "localhost:9001");
         props3.put("connector.topic", "test-topic-3");
-        props3.put("connector.tids", "test-tid-1,test-tid-2");
+        props3.put("connector.stream-ids", "test-streamId-1,test-streamId-2");
         props3.put("connector.group", "test-group-3");
 
         return Arrays.asList(props1, props2, props3);

Reply via email to