This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e1696f  [Feature] [InLong-Agent] Modify bid and tid to inlongGroupId 
and inlongStreamId (#1733)
3e1696f is described below

commit 3e1696ffcfb0a46c5e8bb99249fde39b5363c86e
Author: ziruipeng <[email protected]>
AuthorDate: Tue Nov 2 17:59:49 2021 +0800

    [Feature] [InLong-Agent] Modify bid and tid to inlongGroupId and 
inlongStreamId (#1733)
---
 .../inlong/agent/constants/CommonConstants.java    | 16 +++---
 .../apache/inlong/agent/message/ProxyMessage.java  | 20 +++----
 .../apache/inlong/agent/plugin/MessageFilter.java  |  6 +-
 .../java/org/apache/inlong/agent/plugin/Sink.java  |  2 +-
 inlong-agent/agent-docker/README.md                |  8 +--
 .../plugin/fetcher/ManagerResultFormatter.java     |  4 +-
 .../agent/plugin/fetcher/dtos/DataConfig.java      |  4 +-
 .../agent/plugin/fetcher/dtos/JobProfileDto.java   |  4 +-
 .../agent/plugin/filter/DefaultMessageFilter.java  | 10 ++--
 .../agent/plugin/message/PackProxyMessage.java     | 22 ++++----
 .../inlong/agent/plugin/sinks/ProxySink.java       | 58 +++++++++----------
 .../inlong/agent/plugin/sinks/SenderManager.java   | 66 +++++++++++-----------
 .../apache/inlong/agent/plugin/TestFileAgent.java  |  2 +-
 ...{TestTidFilter.java => TestStreamIdFilter.java} | 12 ++--
 14 files changed, 117 insertions(+), 117 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
index 33a406b..c9e73d0 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
@@ -24,10 +24,10 @@ public class CommonConstants {
     public static final String PROXY_NET_TAG = "proxy.net.tag";
     public static final String DEFAULT_PROXY_NET_TAG = "";
 
-    public static final String PROXY_BID = "proxy.bid";
+    public static final String PROXY_INLONG_GROUP_ID = "proxy.inlongGroupId";
     public static final String POSITION_SUFFIX = ".position";
 
-    public static final String PROXY_TID = "proxy.tid";
+    public static final String PROXY_INLONG_STREAM_ID = "proxy.inlongStreamId";
 
     public static final String PROXY_LOCAL_HOST = "proxy.localHost";
     public static final String DEFAULT_PROXY_LOCALHOST = 
AgentUtils.getLocalIp();
@@ -47,16 +47,16 @@ public class CommonConstants {
     public static final String PROXY_IS_COMPRESS = "proxy.is.compress";
     public static final boolean DEFAULT_PROXY_IS_COMPRESS = true;
 
-    public static final String PROXY_MAX_SENDER_PER_BID = 
"proxy.max.sender.per.pid";
-    public static final int DEFAULT_PROXY_MAX_SENDER_PER_PID = 10;
+    public static final String PROXY_MAX_SENDER_PER_GROUP = 
"proxy.max.sender.per.group";
+    public static final int DEFAULT_PROXY_MAX_SENDER_PER_GROUP = 10;
 
     // max size of message list
     public static final String PROXY_PACKAGE_MAX_SIZE = 
"proxy.package.maxSize";
     // max size of single batch in bytes, default is 200KB.
     public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 200000;
 
-    public static final String PROXY_TID_QUEUE_MAX_NUMBER = 
"proxy.tid.queue.maxNumber";
-    public static final int DEFAULT_PROXY_TID_QUEUE_MAX_NUMBER = 10000;
+    public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 
"proxy.group.queue.maxNumber";
+    public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 
10000;
 
     public static final String PROXY_PACKAGE_MAX_TIMEOUT_MS = 
"proxy.package.maxTimeout.ms";
     public static final int DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000;
@@ -80,8 +80,8 @@ public class CommonConstants {
     public static final String FIELD_SPLITTER = "proxy.field.splitter";
     public static final String DEFAULT_FIELD_SPLITTER = "|";
 
-    public static final String PROXY_KEY_BID = "bid";
-    public static final String PROXY_KEY_TID = "tid";
+    public static final String PROXY_KEY_GROUP_ID = "groupId";
+    public static final String PROXY_KEY_STREAM_ID = "streamId";
     public static final String PROXY_KEY_ID = "id";
     public static final String PROXY_KEY_AGENT_IP = "agentip";
     public static final String PROXY_OCEANUS_F = "f";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
index 938636f..2d33f1a 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
@@ -21,22 +21,22 @@ import java.util.Map;
 import org.apache.inlong.agent.plugin.Message;
 
 /**
- * Bus message with body, header, bid and tid.
+ * Bus message with body, header, inlongGroupId and inlongStreamId.
  */
 public class ProxyMessage implements Message {
 
-    private static final String DEFAULT_TID = "__";
+    private static final String DEFAULT_INLONG_STREAM_ID = "__";
 
     private final byte[] body;
     private final Map<String, String> header;
-    private final String bid;
-    private final String tid;
+    private final String inlongGroupId;
+    private final String inlongStreamId;
 
     public ProxyMessage(byte[] body, Map<String, String> header) {
         this.body = body;
         this.header = header;
-        this.bid = header.get("bid");
-        this.tid = header.getOrDefault("tid", DEFAULT_TID);
+        this.inlongGroupId = header.get("inlongGroupId");
+        this.inlongStreamId = header.getOrDefault("inlongStreamId", 
DEFAULT_INLONG_STREAM_ID);
     }
 
     /**
@@ -59,12 +59,12 @@ public class ProxyMessage implements Message {
         return header;
     }
 
-    public String getBid() {
-        return bid;
+    public String getInlongGroupId() {
+        return inlongGroupId;
     }
 
-    public String getTid() {
-        return tid;
+    public String getInlongStreamId() {
+        return inlongStreamId;
     }
 
     public static ProxyMessage parse(Message message) {
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/MessageFilter.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/MessageFilter.java
index b88c758..966dd77 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/MessageFilter.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/MessageFilter.java
@@ -20,11 +20,11 @@ package org.apache.inlong.agent.plugin;
 public interface MessageFilter {
 
     /**
-     * split a message to get tid string
-     * used when the file is separated with different tid
+     * split a message to get stream id string
+     * used when the file is separated with different steam id
      * @param message the input message
      * @param fieldSplitter fieldSplitter used when split a line
      * @return
      */
-    String filterTid(Message message, byte[] fieldSplitter);
+    String filterStreamId(Message message, byte[] fieldSplitter);
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
index 50a2477..22ee95f 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
@@ -38,7 +38,7 @@ public interface Sink extends Stage {
     void setSourceFile(String sourceFileName);
 
     /**
-     * every sink should include a message filter to filter out tid
+     * every sink should include a message filter to filter out stream id
      */
     MessageFilter initMessageFilter(JobProfile jobConf);
 }
diff --git a/inlong-agent/agent-docker/README.md 
b/inlong-agent/agent-docker/README.md
index 1f53ec4..343beda 100644
--- a/inlong-agent/agent-docker/README.md
+++ b/inlong-agent/agent-docker/README.md
@@ -35,8 +35,8 @@ curl --location --request POST 
'http://localhost:8008/config/job' \
 "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel"
 },
 "proxy": {
-"bid": "bid10",
-"tid": "bid10"
+"inlongGroupId": "group10",
+"inlongStreamId": "group10"
 },
 "op": "add"
 }'
@@ -48,5 +48,5 @@ The meaning of each parameter is :
 - job.trigger: Trigger name, the default is DirectoryTrigger, the function is 
to monitor the files under the folder to generate events
 - job.source: The type of data source used, the default is TextFileSource, 
which reads text files
 - job.sink:The type of writer used, the default is ProxySink, which sends 
messages to the proxy
-- proxy.bid: The bid type used when writing proxy
-- proxy.tid: The tid type used when writing proxy
\ No newline at end of file
+- proxy.inlongGroupId: The inlongGroupId used when writing proxy
+- proxy.inlongStreamId: The inlongStreamId used when writing proxy
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
index 5f4d744..55af66a 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
@@ -128,8 +128,8 @@ public class ManagerResultFormatter {
         AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
         manager.setHost(agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST));
         manager.setPort(agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT));
-        proxy.setBid(dataConfigs.getBusinessIdentifier());
-        proxy.setTid(dataConfigs.getDataStreamIdentifier());
+        proxy.setInlongGroupId(dataConfigs.getInlongGroupId());
+        proxy.setInlongStreamId(dataConfigs.getInlongStreamId());
         proxy.setManager(manager);
         return proxy;
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
index 8c02e3f..8950199 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
@@ -22,9 +22,9 @@ import lombok.Data;
 @Data
 public class DataConfig {
     private String additionalAttr;
-    private String businessIdentifier;
+    private String inlongGroupId;
     private String dataName;
-    private String dataStreamIdentifier;
+    private String inlongStreamId;
     private String deliveryTime;
     private String fieldSplitter;
     private String cycleUnit;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
index 056e8e3..82e8b09 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
@@ -66,8 +66,8 @@ public class JobProfileDto {
     
     @Data
     public static class Proxy {
-        private String bid;
-        private String tid;
+        private String inlongGroupId;
+        private String inlongStreamId;
         private Manager manager;
     }
 }
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DefaultMessageFilter.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DefaultMessageFilter.java
index 180bc8e..caa422f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DefaultMessageFilter.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DefaultMessageFilter.java
@@ -22,17 +22,17 @@ import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.utils.ByteUtil;
 
 /**
- * filter message to get tid
- * use the first word to set tid string
+ * filter message to get stream id
+ * use the first word to set stream id string
  */
 public class DefaultMessageFilter implements MessageFilter {
 
-    public static final int TID_INDEX = 0;
+    public static final int STREAM_INDEX = 0;
     public static final int FIELDS_LIMIT = 2;
 
     @Override
-    public String filterTid(Message message, byte[] fieldSplitter) {
+    public String filterStreamId(Message message, byte[] fieldSplitter) {
         byte[] body = message.getBody();
-        return new String(ByteUtil.split(body, fieldSplitter, 
FIELDS_LIMIT)[TID_INDEX]);
+        return new String(ByteUtil.split(body, fieldSplitter, 
FIELDS_LIMIT)[STREAM_INDEX]);
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
index c686783..a6b9cac 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
@@ -27,13 +27,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Handle List of BusMessage, which belong to the same tid.
+ * Handle List of BusMessage, which belong to the same stream id.
  */
 public class PackProxyMessage {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PackProxyMessage.class);
 
-    private final String tid;
+    private final String streamId;
 
     private int currentSize;
 
@@ -43,7 +43,7 @@ public class PackProxyMessage {
     // ms
     private final int cacheTimeout;
 
-    // tid -> list of proxyMessage
+    // streamId -> list of proxyMessage
     private final LinkedBlockingQueue<ProxyMessage> messageQueue;
 
     private volatile long currentCacheTime = System.currentTimeMillis();
@@ -52,17 +52,17 @@ public class PackProxyMessage {
     /**
      * Init PackBusMessage
      *
-     * @param maxPackSize - max pack size for one bid
+     * @param maxPackSize - max pack size for one inlongGroupId
      * @param cacheTimeout - cache timeout for one proxy message
-     * @param tid - tid
+     * @param streamId - streamId
      */
-    public PackProxyMessage(int maxPackSize, int maxQueueNumber, int 
cacheTimeout, String tid) {
+    public PackProxyMessage(int maxPackSize, int maxQueueNumber, int 
cacheTimeout, String streamId) {
         this.maxPackSize = maxPackSize;
         this.maxQueueNumber = maxPackSize * 10;
         this.cacheTimeout = cacheTimeout;
         // double size of package
         this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
-        this.tid = tid;
+        this.streamId = streamId;
     }
 
     /**
@@ -75,10 +75,10 @@ public class PackProxyMessage {
     }
 
     /**
-     * Add proxy message to cache, proxy message should belong to the same tid.
+     * Add proxy message to cache, proxy message should belong to the same 
stream id.
      */
     public void addProxyMessage(ProxyMessage message) {
-        assert tid.equals(message.getTid());
+        assert streamId.equals(message.getInlongStreamId());
         try {
             if (queueIsFull()) {
                 LOGGER.warn("message queue is greater than {}, stop adding 
message, "
@@ -103,7 +103,7 @@ public class PackProxyMessage {
     /**
      * Fetch batch of proxy message, timeout message or max number of list 
satisfied.
      *
-     * @return map of message list, key is tid for the batch.
+     * @return map of message list, key is stream id for the batch.
      * return null if there are no valid messages.
      */
     public Pair<String, List<byte[]>> fetchBatch() {
@@ -133,7 +133,7 @@ public class PackProxyMessage {
             }
             // make sure result is not empty.
             if (!result.isEmpty()) {
-                return Pair.of(tid, result);
+                return Pair.of(streamId, result);
             }
         }
         return null;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 1ffe339..42d4739 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -18,20 +18,20 @@
 package org.apache.inlong.agent.plugin.sinks;
 
 import static 
org.apache.inlong.agent.constants.CommonConstants.DEFAULT_FIELD_SPLITTER;
-import static org.apache.inlong.agent.constants.CommonConstants.PROXY_BID;
+import static 
org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constants.CommonConstants.PROXY_KEY_AGENT_IP;
 import static org.apache.inlong.agent.constants.CommonConstants.PROXY_KEY_ID;
 import static 
org.apache.inlong.agent.constants.CommonConstants.PROXY_OCEANUS_BL;
 import static 
org.apache.inlong.agent.constants.CommonConstants.PROXY_OCEANUS_F;
-import static org.apache.inlong.agent.constants.CommonConstants.PROXY_TID;
+import static 
org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.constants.JobConstants.PROXY_BATCH_FLUSH_INTERVAL;
 import static 
org.apache.inlong.agent.constants.JobConstants.PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constants.JobConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static 
org.apache.inlong.agent.constants.JobConstants.PROXY_TID_QUEUE_MAX_NUMBER;
+import static 
org.apache.inlong.agent.constants.JobConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
 import static 
org.apache.inlong.agent.constants.JobConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
 import static 
org.apache.inlong.agent.constants.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constants.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static 
org.apache.inlong.agent.constants.JobConstants.DEFAULT_PROXY_TID_QUEUE_MAX_NUMBER;
+import static 
org.apache.inlong.agent.constants.JobConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_ADDITION_STR;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_CYCLE_UNIT;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_DATA_TIME;
@@ -68,8 +68,8 @@ public class ProxySink extends AbstractSink {
     private MessageFilter messageFilter;
     private SenderManager senderManager;
     private byte[] fieldSplitter;
-    private String bid;
-    private String tid;
+    private String inlongGroupId;
+    private String inlongStreamId;
     private String sourceFile;
     private String jobInstanceId;
     private int maxBatchSize;
@@ -81,24 +81,24 @@ public class ProxySink extends AbstractSink {
             new LinkedBlockingQueue<>(), new AgentThreadFactory("ProxySink"));
     private volatile boolean shutdown = false;
 
-    // key is tid, value is a batch of messages belong to the same tid
+    // key is stream id, value is a batch of messages belong to the same 
stream id
     private ConcurrentHashMap<String, PackProxyMessage> cache;
     private long dataTime;
 
     @Override
     public void write(Message message) {
         if (message != null) {
-            message.getHeader().put(CommonConstants.PROXY_KEY_BID, bid);
-            extractTidFromMessage(message, fieldSplitter);
+            message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, 
inlongStreamId);
+            extractStreamFromMessage(message, fieldSplitter);
             if (!(message instanceof EndMessage)) {
                 ProxyMessage proxyMessage = ProxyMessage.parse(message);
                     // add proxy message to cache.
-                cache.compute(proxyMessage.getTid(),
+                cache.compute(proxyMessage.getInlongStreamId(),
                     (s, packProxyMessage) -> {
                         if (packProxyMessage == null) {
                             packProxyMessage = new PackProxyMessage(
                             maxBatchSize, maxQueueNumber,
-                            maxBatchTimeoutMs, proxyMessage.getTid());
+                            maxBatchTimeoutMs, 
proxyMessage.getInlongStreamId());
                         }
                         // add message to package proxy
                         packProxyMessage.addProxyMessage(proxyMessage);
@@ -110,16 +110,16 @@ public class ProxySink extends AbstractSink {
     }
 
     /**
-     * extract tid from message if message filter is presented
-     * or use the default tid
+     * extract stream id from message if message filter is presented
+     * or use the default stream id
      * @param message
      */
-    private void extractTidFromMessage(Message message, byte[] fieldSplitter) {
+    private void extractStreamFromMessage(Message message, byte[] 
fieldSplitter) {
         if (messageFilter != null) {
-            message.getHeader().put(CommonConstants.PROXY_KEY_TID,
-                messageFilter.filterTid(message, fieldSplitter));
+            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
+                messageFilter.filterStreamId(message, fieldSplitter));
         } else {
-            message.getHeader().put(CommonConstants.PROXY_KEY_TID, tid);
+            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, 
inlongStreamId);
         }
     }
 
@@ -135,16 +135,16 @@ public class ProxySink extends AbstractSink {
      */
     private Runnable flushCache() {
         return () -> {
-            LOGGER.info("start flush cache thread for {} TDBusSink", bid);
+            LOGGER.info("start flush cache thread for {} ProxySink", 
inlongStreamId);
             while (!shutdown) {
                 try {
                     cache.forEach((s, packProxyMessage) -> {
                         Pair<String, List<byte[]>> result = 
packProxyMessage.fetchBatch();
                         if (result != null) {
-                            senderManager.sendBatch(jobInstanceId, bid, 
result.getKey(),
+                            senderManager.sendBatch(jobInstanceId, 
inlongStreamId, result.getKey(),
                                     result.getValue(), 0, dataTime);
-                            LOGGER.info("send bid {} with message size {}, the 
job id is {}, read file is {}"
-                                    + "dataTime is {}", bid, 
result.getRight().size(),
+                            LOGGER.info("send group id {} with message size 
{}, the job id is {}, read file is {}"
+                                    + "dataTime is {}", inlongStreamId, 
result.getRight().size(),
                                 jobInstanceId, sourceFile, dataTime);
                         }
 
@@ -160,28 +160,28 @@ public class ProxySink extends AbstractSink {
     @Override
     public void init(JobProfile jobConf) {
         maxBatchSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
-        maxQueueNumber = jobConf.getInt(PROXY_TID_QUEUE_MAX_NUMBER,
-            DEFAULT_PROXY_TID_QUEUE_MAX_NUMBER);
+        maxQueueNumber = 
jobConf.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
+            DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
         maxBatchTimeoutMs = jobConf.getInt(
             PROXY_PACKAGE_MAX_TIMEOUT_MS, 
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
         jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
         batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
             DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
         cache = new ConcurrentHashMap<>(10);
-        bid = jobConf.get(PROXY_BID);
+        inlongStreamId = jobConf.get(PROXY_INLONG_GROUP_ID);
         dataTime = 
AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
             jobConf.get(JOB_CYCLE_UNIT, ""));
-        bid = jobConf.get(PROXY_BID);
-        tid = jobConf.get(PROXY_TID, "");
+        inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);
+        inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, "");
         messageFilter = initMessageFilter(jobConf);
         fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER, 
DEFAULT_FIELD_SPLITTER).getBytes(
             StandardCharsets.UTF_8);
         executorService.execute(flushCache());
-        senderManager = new SenderManager(jobConf, bid, sourceFile);
+        senderManager = new SenderManager(jobConf, inlongStreamId, sourceFile);
         try {
             senderManager.addMessageSender();
         } catch (Exception ex) {
-            LOGGER.error("error while init sender for bid {}", bid);
+            LOGGER.error("error while init sender for group id {}", 
inlongStreamId);
             throw new IllegalStateException(ex);
         }
     }
@@ -214,7 +214,7 @@ public class ProxySink extends AbstractSink {
     }
 
     /**
-     * check whether all tid messages finished
+     * check whether all stream id messages finished
      * @return
      */
     private boolean sinkFinish() {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 8f2b1dd..ad652b1 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -51,7 +51,7 @@ public class SenderManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SenderManager.class);
     private static final SequentialID SEQUENTIAL_ID = 
SequentialID.getInstance();
     private static final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
-    // cache for bid and sender list, share the map cross agent lifecycle.
+    // cache for group and sender list, share the map cross agent lifecycle.
     private static final ConcurrentHashMap<String, List<DefaultMessageSender>> 
SENDER_MAP =
             new ConcurrentHashMap<>();
 
@@ -79,13 +79,13 @@ public class SenderManager {
     private final long maxSenderTimeout;
     private final int maxSenderRetry;
     private final long retrySleepTime;
-    private final String bid;
+    private final String inlongGroupId;
     private TaskPositionManager taskPositionManager;
-    private final int maxSenderPerBid;
+    private final int maxSenderPerGroup;
     private final String sourceFilePath;
     private final PluginMetric metric = new PluginMetric();
 
-    public SenderManager(JobProfile jobConf, String bid, String 
sourceFilePath) {
+    public SenderManager(JobProfile jobConf, String inlongGroupId, String 
sourceFilePath) {
         AgentConfiguration conf = AgentConfiguration.getAgentConf();
         managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
         managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
@@ -101,8 +101,8 @@ public class SenderManager {
                     CommonConstants.PROXY_ALIVE_CONNECTION_NUM, 
CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
         isCompress = jobConf.getBoolean(
             CommonConstants.PROXY_IS_COMPRESS, 
CommonConstants.DEFAULT_PROXY_IS_COMPRESS);
-        maxSenderPerBid = jobConf.getInt(
-            CommonConstants.PROXY_MAX_SENDER_PER_BID, 
CommonConstants.DEFAULT_PROXY_MAX_SENDER_PER_PID);
+        maxSenderPerGroup = jobConf.getInt(
+            CommonConstants.PROXY_MAX_SENDER_PER_GROUP, 
CommonConstants.DEFAULT_PROXY_MAX_SENDER_PER_GROUP);
         msgType = jobConf.getInt(CommonConstants.PROXY_MSG_TYPE, 
CommonConstants.DEFAULT_PROXY_MSG_TYPE);
         maxSenderTimeout = jobConf.getInt(
             CommonConstants.PROXY_SENDER_MAX_TIMEOUT, 
CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT);
@@ -113,30 +113,30 @@ public class SenderManager {
         isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE, 
CommonConstants.DEFAULT_IS_FILE);
         taskPositionManager = TaskPositionManager.getTaskPositionManager();
         this.sourceFilePath = sourceFilePath;
-        this.bid = bid;
+        this.inlongGroupId = inlongGroupId;
     }
 
     /**
-     * Select by bid.
+     * Select by group.
      *
-     * @param bid - business id
+     * @param group - inlong group id
      * @return default message sender
      */
-    private DefaultMessageSender selectSender(String bid) {
-        List<DefaultMessageSender> senderList = SENDER_MAP.get(bid);
+    private DefaultMessageSender selectSender(String group) {
+        List<DefaultMessageSender> senderList = SENDER_MAP.get(group);
         return senderList.get((SENDER_INDEX.getAndIncrement() & 0x7FFFFFFF) % 
senderList.size());
     }
 
     /**
      * sender
      *
-     * @param bid - business id
+     * @param groupId - group id
      * @return DefaultMessageSender
      */
-    private DefaultMessageSender createMessageSender(String bid) throws 
Exception {
+    private DefaultMessageSender createMessageSender(String groupId) throws 
Exception {
 
         ProxyClientConfig proxyClientConfig = new ProxyClientConfig(
-                localhost, isLocalVisit, managerHost, managerPort, bid, 
netTag);
+                localhost, isLocalVisit, managerHost, managerPort, groupId, 
netTag);
         proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
         proxyClientConfig.setFile(isFile);
         proxyClientConfig.setAliveConnections(aliveConnectionNum);
@@ -148,19 +148,19 @@ public class SenderManager {
     }
 
     /**
-     * Add new sender for bid if max size is not satisfied.
+     * Add new sender for group id if max size is not satisfied.
      *
      */
     public void addMessageSender() throws Exception {
         List<DefaultMessageSender> tmpList = new ArrayList<>();
-        List<DefaultMessageSender> senderList = SENDER_MAP.putIfAbsent(bid, 
tmpList);
+        List<DefaultMessageSender> senderList = 
SENDER_MAP.putIfAbsent(inlongGroupId, tmpList);
         if (senderList == null) {
             senderList = tmpList;
         }
-        if (senderList.size() > maxSenderPerBid) {
+        if (senderList.size() > maxSenderPerGroup) {
             return;
         }
-        DefaultMessageSender sender = createMessageSender(bid);
+        DefaultMessageSender sender = createMessageSender(inlongGroupId);
         senderList.add(sender);
     }
 
@@ -169,17 +169,17 @@ public class SenderManager {
      */
     private class AgentSenderCallback implements SendMessageCallback {
         private final int retry;
-        private final String bid;
+        private final String groupId;
         private final List<byte[]> bodyList;
-        private final String tid;
+        private final String streamId;
         private final long dataTime;
         private final String jobId;
 
-        AgentSenderCallback(String jobId, String bid, String tid, List<byte[]> 
bodyList, int retry,
+        AgentSenderCallback(String jobId, String groupId, String streamId, 
List<byte[]> bodyList, int retry,
             long dataTime) {
             this.retry = retry;
-            this.bid = bid;
-            this.tid = tid;
+            this.groupId = groupId;
+            this.streamId = streamId;
             this.bodyList = bodyList;
             this.jobId = jobId;
             this.dataTime = dataTime;
@@ -189,9 +189,9 @@ public class SenderManager {
         public void onMessageAck(SendResult result) {
             // if send result is not ok, retry again.
             if (result == null || !result.equals(SendResult.OK)) {
-                LOGGER.warn("send bid {}, tid {}, jobId {}, dataTime {} fail 
with times {}",
-                    bid, tid, jobId, dataTime, retry);
-                sendBatch(jobId, bid, tid, bodyList, retry + 1, dataTime);
+                LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime 
{} fail with times {}",
+                    groupId, streamId, jobId, dataTime, retry);
+                sendBatch(jobId, groupId, streamId, bodyList, retry + 1, 
dataTime);
                 return;
             }
             metric.sendSuccessNum.incr(bodyList.size());
@@ -207,21 +207,21 @@ public class SenderManager {
     /**
      * Send message to proxy by batch, use message cache.
      *
-     * @param bid - bid
-     * @param tid - tid
+     * @param groupId - groupId
+     * @param streamId - streamId
      * @param bodyList - body list
      * @param retry - retry time
      */
-    public void sendBatch(String jobId, String bid, String tid,
+    public void sendBatch(String jobId, String groupId, String streamId,
         List<byte[]> bodyList, int retry, long dataTime) {
         if (retry > maxSenderRetry) {
             LOGGER.warn("max retry reached, retry count is {}, sleep and send 
again", retry);
             AgentUtils.silenceSleepInMs(retrySleepTime);
         }
         try {
-            selectSender(bid).asyncSendMessage(
-                    new AgentSenderCallback(jobId, bid, tid, bodyList, retry, 
dataTime),
-                    bodyList, bid, tid,
+            selectSender(groupId).asyncSendMessage(
+                    new AgentSenderCallback(jobId, groupId, streamId, 
bodyList, retry, dataTime),
+                    bodyList, groupId, streamId,
                     dataTime,
                     SEQUENTIAL_ID.getNextUuid(),
                     maxSenderTimeout,
@@ -232,7 +232,7 @@ public class SenderManager {
             // retry time
             try {
                 TimeUnit.SECONDS.sleep(1);
-                sendBatch(jobId, bid, tid, bodyList, retry + 1, dataTime);
+                sendBatch(jobId, groupId, streamId, bodyList, retry + 1, 
dataTime);
             } catch (Exception ignored) {
                 // ignore it.
             }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index a970a99..ef84909 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -162,7 +162,7 @@ public class TestFileAgent {
     }
 
     @Test
-    public void testTidFilter() throws Exception {
+    public void testGroupIdFilter() throws Exception {
 
         String nowDate = AgentUtils.formatCurrentTimeWithoutOffset("yyyyMMdd");
 
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestTidFilter.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestStreamIdFilter.java
similarity index 87%
rename from 
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestTidFilter.java
rename to 
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestStreamIdFilter.java
index e3ac7bd..cf81adf 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestTidFilter.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestStreamIdFilter.java
@@ -33,7 +33,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class TestTidFilter {
+public class TestStreamIdFilter {
 
     private static AgentBaseTestsHelper helper;
     private static Path testPath;
@@ -50,12 +50,12 @@ public class TestTidFilter {
     }
 
     @Test
-    public void testFilterTid() {
+    public void testStreamId() {
         DefaultMessageFilter messageFilter = new DefaultMessageFilter();
-        ProxyMessage proxyMessage = new ProxyMessage("tid|this is a line of 
file".getBytes(
+        ProxyMessage proxyMessage = new ProxyMessage("streamId|this is a line 
of file".getBytes(
             StandardCharsets.UTF_8), new HashMap<>());
-        String s = messageFilter.filterTid(proxyMessage, 
"|".getBytes(StandardCharsets.UTF_8));
-        Assert.assertEquals(s, "tid");
+        String s = messageFilter.filterStreamId(proxyMessage, 
"|".getBytes(StandardCharsets.UTF_8));
+        Assert.assertEquals(s, "streamId");
     }
 
     @Test
@@ -66,7 +66,7 @@ public class TestTidFilter {
         MessageFilter messageFilter = sinkTest.initMessageFilter(jobProfile);
         ProxyMessage proxyMessage = new ProxyMessage("tid|this is a line of 
file".getBytes(
             StandardCharsets.UTF_8), new HashMap<>());
-        String s = messageFilter.filterTid(proxyMessage, 
"|".getBytes(StandardCharsets.UTF_8));
+        String s = messageFilter.filterStreamId(proxyMessage, 
"|".getBytes(StandardCharsets.UTF_8));
         Assert.assertEquals(s, "tid");
     }
 

Reply via email to