This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3e1696f [Feature] [InLong-Agent] Modify bid and tid to inlongGroupId
and inlongStreamId (#1733)
3e1696f is described below
commit 3e1696ffcfb0a46c5e8bb99249fde39b5363c86e
Author: ziruipeng <[email protected]>
AuthorDate: Tue Nov 2 17:59:49 2021 +0800
[Feature] [InLong-Agent] Modify bid and tid to inlongGroupId and
inlongStreamId (#1733)
---
.../inlong/agent/constants/CommonConstants.java | 16 +++---
.../apache/inlong/agent/message/ProxyMessage.java | 20 +++----
.../apache/inlong/agent/plugin/MessageFilter.java | 6 +-
.../java/org/apache/inlong/agent/plugin/Sink.java | 2 +-
inlong-agent/agent-docker/README.md | 8 +--
.../plugin/fetcher/ManagerResultFormatter.java | 4 +-
.../agent/plugin/fetcher/dtos/DataConfig.java | 4 +-
.../agent/plugin/fetcher/dtos/JobProfileDto.java | 4 +-
.../agent/plugin/filter/DefaultMessageFilter.java | 10 ++--
.../agent/plugin/message/PackProxyMessage.java | 22 ++++----
.../inlong/agent/plugin/sinks/ProxySink.java | 58 +++++++++----------
.../inlong/agent/plugin/sinks/SenderManager.java | 66 +++++++++++-----------
.../apache/inlong/agent/plugin/TestFileAgent.java | 2 +-
...{TestTidFilter.java => TestStreamIdFilter.java} | 12 ++--
14 files changed, 117 insertions(+), 117 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
index 33a406b..c9e73d0 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
@@ -24,10 +24,10 @@ public class CommonConstants {
public static final String PROXY_NET_TAG = "proxy.net.tag";
public static final String DEFAULT_PROXY_NET_TAG = "";
- public static final String PROXY_BID = "proxy.bid";
+ public static final String PROXY_INLONG_GROUP_ID = "proxy.inlongGroupId";
public static final String POSITION_SUFFIX = ".position";
- public static final String PROXY_TID = "proxy.tid";
+ public static final String PROXY_INLONG_STREAM_ID = "proxy.inlongStreamId";
public static final String PROXY_LOCAL_HOST = "proxy.localHost";
public static final String DEFAULT_PROXY_LOCALHOST =
AgentUtils.getLocalIp();
@@ -47,16 +47,16 @@ public class CommonConstants {
public static final String PROXY_IS_COMPRESS = "proxy.is.compress";
public static final boolean DEFAULT_PROXY_IS_COMPRESS = true;
- public static final String PROXY_MAX_SENDER_PER_BID =
"proxy.max.sender.per.pid";
- public static final int DEFAULT_PROXY_MAX_SENDER_PER_PID = 10;
+ public static final String PROXY_MAX_SENDER_PER_GROUP =
"proxy.max.sender.per.group";
+ public static final int DEFAULT_PROXY_MAX_SENDER_PER_GROUP = 10;
// max size of message list
public static final String PROXY_PACKAGE_MAX_SIZE =
"proxy.package.maxSize";
// max size of single batch in bytes, default is 200KB.
public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 200000;
- public static final String PROXY_TID_QUEUE_MAX_NUMBER =
"proxy.tid.queue.maxNumber";
- public static final int DEFAULT_PROXY_TID_QUEUE_MAX_NUMBER = 10000;
+ public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER =
"proxy.group.queue.maxNumber";
+ public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER =
10000;
public static final String PROXY_PACKAGE_MAX_TIMEOUT_MS =
"proxy.package.maxTimeout.ms";
public static final int DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000;
@@ -80,8 +80,8 @@ public class CommonConstants {
public static final String FIELD_SPLITTER = "proxy.field.splitter";
public static final String DEFAULT_FIELD_SPLITTER = "|";
- public static final String PROXY_KEY_BID = "bid";
- public static final String PROXY_KEY_TID = "tid";
+ public static final String PROXY_KEY_GROUP_ID = "groupId";
+ public static final String PROXY_KEY_STREAM_ID = "streamId";
public static final String PROXY_KEY_ID = "id";
public static final String PROXY_KEY_AGENT_IP = "agentip";
public static final String PROXY_OCEANUS_F = "f";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
index 938636f..2d33f1a 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
@@ -21,22 +21,22 @@ import java.util.Map;
import org.apache.inlong.agent.plugin.Message;
/**
- * Bus message with body, header, bid and tid.
+ * Bus message with body, header, inlongGroupId and inlongStreamId.
*/
public class ProxyMessage implements Message {
- private static final String DEFAULT_TID = "__";
+ private static final String DEFAULT_INLONG_STREAM_ID = "__";
private final byte[] body;
private final Map<String, String> header;
- private final String bid;
- private final String tid;
+ private final String inlongGroupId;
+ private final String inlongStreamId;
public ProxyMessage(byte[] body, Map<String, String> header) {
this.body = body;
this.header = header;
- this.bid = header.get("bid");
- this.tid = header.getOrDefault("tid", DEFAULT_TID);
+ this.inlongGroupId = header.get("inlongGroupId");
+ this.inlongStreamId = header.getOrDefault("inlongStreamId",
DEFAULT_INLONG_STREAM_ID);
}
/**
@@ -59,12 +59,12 @@ public class ProxyMessage implements Message {
return header;
}
- public String getBid() {
- return bid;
+ public String getInlongGroupId() {
+ return inlongGroupId;
}
- public String getTid() {
- return tid;
+ public String getInlongStreamId() {
+ return inlongStreamId;
}
public static ProxyMessage parse(Message message) {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/MessageFilter.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/MessageFilter.java
index b88c758..966dd77 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/MessageFilter.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/MessageFilter.java
@@ -20,11 +20,11 @@ package org.apache.inlong.agent.plugin;
public interface MessageFilter {
/**
- * split a message to get tid string
- * used when the file is separated with different tid
+ * split a message to get stream id string
+ * used when the file is separated with different steam id
* @param message the input message
* @param fieldSplitter fieldSplitter used when split a line
* @return
*/
- String filterTid(Message message, byte[] fieldSplitter);
+ String filterStreamId(Message message, byte[] fieldSplitter);
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
index 50a2477..22ee95f 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
@@ -38,7 +38,7 @@ public interface Sink extends Stage {
void setSourceFile(String sourceFileName);
/**
- * every sink should include a message filter to filter out tid
+ * every sink should include a message filter to filter out stream id
*/
MessageFilter initMessageFilter(JobProfile jobConf);
}
diff --git a/inlong-agent/agent-docker/README.md
b/inlong-agent/agent-docker/README.md
index 1f53ec4..343beda 100644
--- a/inlong-agent/agent-docker/README.md
+++ b/inlong-agent/agent-docker/README.md
@@ -35,8 +35,8 @@ curl --location --request POST
'http://localhost:8008/config/job' \
"channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel"
},
"proxy": {
-"bid": "bid10",
-"tid": "bid10"
+"inlongGroupId": "group10",
+"inlongStreamId": "group10"
},
"op": "add"
}'
@@ -48,5 +48,5 @@ The meaning of each parameter is :
- job.trigger: Trigger name, the default is DirectoryTrigger, the function is
to monitor the files under the folder to generate events
- job.source: The type of data source used, the default is TextFileSource,
which reads text files
- job.sink:The type of writer used, the default is ProxySink, which sends
messages to the proxy
-- proxy.bid: The bid type used when writing proxy
-- proxy.tid: The tid type used when writing proxy
\ No newline at end of file
+- proxy.inlongGroupId: The inlongGroupId used when writing proxy
+- proxy.inlongStreamId: The inlongStreamId used when writing proxy
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
index 5f4d744..55af66a 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
@@ -128,8 +128,8 @@ public class ManagerResultFormatter {
AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
manager.setHost(agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST));
manager.setPort(agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT));
- proxy.setBid(dataConfigs.getBusinessIdentifier());
- proxy.setTid(dataConfigs.getDataStreamIdentifier());
+ proxy.setInlongGroupId(dataConfigs.getInlongGroupId());
+ proxy.setInlongStreamId(dataConfigs.getInlongStreamId());
proxy.setManager(manager);
return proxy;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
index 8c02e3f..8950199 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
@@ -22,9 +22,9 @@ import lombok.Data;
@Data
public class DataConfig {
private String additionalAttr;
- private String businessIdentifier;
+ private String inlongGroupId;
private String dataName;
- private String dataStreamIdentifier;
+ private String inlongStreamId;
private String deliveryTime;
private String fieldSplitter;
private String cycleUnit;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
index 056e8e3..82e8b09 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
@@ -66,8 +66,8 @@ public class JobProfileDto {
@Data
public static class Proxy {
- private String bid;
- private String tid;
+ private String inlongGroupId;
+ private String inlongStreamId;
private Manager manager;
}
}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DefaultMessageFilter.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DefaultMessageFilter.java
index 180bc8e..caa422f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DefaultMessageFilter.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DefaultMessageFilter.java
@@ -22,17 +22,17 @@ import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.utils.ByteUtil;
/**
- * filter message to get tid
- * use the first word to set tid string
+ * filter message to get stream id
+ * use the first word to set stream id string
*/
public class DefaultMessageFilter implements MessageFilter {
- public static final int TID_INDEX = 0;
+ public static final int STREAM_INDEX = 0;
public static final int FIELDS_LIMIT = 2;
@Override
- public String filterTid(Message message, byte[] fieldSplitter) {
+ public String filterStreamId(Message message, byte[] fieldSplitter) {
byte[] body = message.getBody();
- return new String(ByteUtil.split(body, fieldSplitter,
FIELDS_LIMIT)[TID_INDEX]);
+ return new String(ByteUtil.split(body, fieldSplitter,
FIELDS_LIMIT)[STREAM_INDEX]);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
index c686783..a6b9cac 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
@@ -27,13 +27,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Handle List of BusMessage, which belong to the same tid.
+ * Handle List of BusMessage, which belong to the same stream id.
*/
public class PackProxyMessage {
private static final Logger LOGGER =
LoggerFactory.getLogger(PackProxyMessage.class);
- private final String tid;
+ private final String streamId;
private int currentSize;
@@ -43,7 +43,7 @@ public class PackProxyMessage {
// ms
private final int cacheTimeout;
- // tid -> list of proxyMessage
+ // streamId -> list of proxyMessage
private final LinkedBlockingQueue<ProxyMessage> messageQueue;
private volatile long currentCacheTime = System.currentTimeMillis();
@@ -52,17 +52,17 @@ public class PackProxyMessage {
/**
* Init PackBusMessage
*
- * @param maxPackSize - max pack size for one bid
+ * @param maxPackSize - max pack size for one inlongGroupId
* @param cacheTimeout - cache timeout for one proxy message
- * @param tid - tid
+ * @param streamId - streamId
*/
- public PackProxyMessage(int maxPackSize, int maxQueueNumber, int
cacheTimeout, String tid) {
+ public PackProxyMessage(int maxPackSize, int maxQueueNumber, int
cacheTimeout, String streamId) {
this.maxPackSize = maxPackSize;
this.maxQueueNumber = maxPackSize * 10;
this.cacheTimeout = cacheTimeout;
// double size of package
this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
- this.tid = tid;
+ this.streamId = streamId;
}
/**
@@ -75,10 +75,10 @@ public class PackProxyMessage {
}
/**
- * Add proxy message to cache, proxy message should belong to the same tid.
+ * Add proxy message to cache, proxy message should belong to the same
stream id.
*/
public void addProxyMessage(ProxyMessage message) {
- assert tid.equals(message.getTid());
+ assert streamId.equals(message.getInlongStreamId());
try {
if (queueIsFull()) {
LOGGER.warn("message queue is greater than {}, stop adding
message, "
@@ -103,7 +103,7 @@ public class PackProxyMessage {
/**
* Fetch batch of proxy message, timeout message or max number of list
satisfied.
*
- * @return map of message list, key is tid for the batch.
+ * @return map of message list, key is stream id for the batch.
* return null if there are no valid messages.
*/
public Pair<String, List<byte[]>> fetchBatch() {
@@ -133,7 +133,7 @@ public class PackProxyMessage {
}
// make sure result is not empty.
if (!result.isEmpty()) {
- return Pair.of(tid, result);
+ return Pair.of(streamId, result);
}
}
return null;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 1ffe339..42d4739 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -18,20 +18,20 @@
package org.apache.inlong.agent.plugin.sinks;
import static
org.apache.inlong.agent.constants.CommonConstants.DEFAULT_FIELD_SPLITTER;
-import static org.apache.inlong.agent.constants.CommonConstants.PROXY_BID;
+import static
org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constants.CommonConstants.PROXY_KEY_AGENT_IP;
import static org.apache.inlong.agent.constants.CommonConstants.PROXY_KEY_ID;
import static
org.apache.inlong.agent.constants.CommonConstants.PROXY_OCEANUS_BL;
import static
org.apache.inlong.agent.constants.CommonConstants.PROXY_OCEANUS_F;
-import static org.apache.inlong.agent.constants.CommonConstants.PROXY_TID;
+import static
org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.constants.JobConstants.PROXY_BATCH_FLUSH_INTERVAL;
import static
org.apache.inlong.agent.constants.JobConstants.PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constants.JobConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static
org.apache.inlong.agent.constants.JobConstants.PROXY_TID_QUEUE_MAX_NUMBER;
+import static
org.apache.inlong.agent.constants.JobConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static
org.apache.inlong.agent.constants.JobConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static
org.apache.inlong.agent.constants.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constants.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static
org.apache.inlong.agent.constants.JobConstants.DEFAULT_PROXY_TID_QUEUE_MAX_NUMBER;
+import static
org.apache.inlong.agent.constants.JobConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static org.apache.inlong.agent.constants.JobConstants.JOB_ADDITION_STR;
import static org.apache.inlong.agent.constants.JobConstants.JOB_CYCLE_UNIT;
import static org.apache.inlong.agent.constants.JobConstants.JOB_DATA_TIME;
@@ -68,8 +68,8 @@ public class ProxySink extends AbstractSink {
private MessageFilter messageFilter;
private SenderManager senderManager;
private byte[] fieldSplitter;
- private String bid;
- private String tid;
+ private String inlongGroupId;
+ private String inlongStreamId;
private String sourceFile;
private String jobInstanceId;
private int maxBatchSize;
@@ -81,24 +81,24 @@ public class ProxySink extends AbstractSink {
new LinkedBlockingQueue<>(), new AgentThreadFactory("ProxySink"));
private volatile boolean shutdown = false;
- // key is tid, value is a batch of messages belong to the same tid
+ // key is stream id, value is a batch of messages belong to the same
stream id
private ConcurrentHashMap<String, PackProxyMessage> cache;
private long dataTime;
@Override
public void write(Message message) {
if (message != null) {
- message.getHeader().put(CommonConstants.PROXY_KEY_BID, bid);
- extractTidFromMessage(message, fieldSplitter);
+ message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID,
inlongStreamId);
+ extractStreamFromMessage(message, fieldSplitter);
if (!(message instanceof EndMessage)) {
ProxyMessage proxyMessage = ProxyMessage.parse(message);
// add proxy message to cache.
- cache.compute(proxyMessage.getTid(),
+ cache.compute(proxyMessage.getInlongStreamId(),
(s, packProxyMessage) -> {
if (packProxyMessage == null) {
packProxyMessage = new PackProxyMessage(
maxBatchSize, maxQueueNumber,
- maxBatchTimeoutMs, proxyMessage.getTid());
+ maxBatchTimeoutMs,
proxyMessage.getInlongStreamId());
}
// add message to package proxy
packProxyMessage.addProxyMessage(proxyMessage);
@@ -110,16 +110,16 @@ public class ProxySink extends AbstractSink {
}
/**
- * extract tid from message if message filter is presented
- * or use the default tid
+ * extract stream id from message if message filter is presented
+ * or use the default stream id
* @param message
*/
- private void extractTidFromMessage(Message message, byte[] fieldSplitter) {
+ private void extractStreamFromMessage(Message message, byte[]
fieldSplitter) {
if (messageFilter != null) {
- message.getHeader().put(CommonConstants.PROXY_KEY_TID,
- messageFilter.filterTid(message, fieldSplitter));
+ message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
+ messageFilter.filterStreamId(message, fieldSplitter));
} else {
- message.getHeader().put(CommonConstants.PROXY_KEY_TID, tid);
+ message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
inlongStreamId);
}
}
@@ -135,16 +135,16 @@ public class ProxySink extends AbstractSink {
*/
private Runnable flushCache() {
return () -> {
- LOGGER.info("start flush cache thread for {} TDBusSink", bid);
+ LOGGER.info("start flush cache thread for {} ProxySink",
inlongStreamId);
while (!shutdown) {
try {
cache.forEach((s, packProxyMessage) -> {
Pair<String, List<byte[]>> result =
packProxyMessage.fetchBatch();
if (result != null) {
- senderManager.sendBatch(jobInstanceId, bid,
result.getKey(),
+ senderManager.sendBatch(jobInstanceId,
inlongStreamId, result.getKey(),
result.getValue(), 0, dataTime);
- LOGGER.info("send bid {} with message size {}, the
job id is {}, read file is {}"
- + "dataTime is {}", bid,
result.getRight().size(),
+ LOGGER.info("send group id {} with message size
{}, the job id is {}, read file is {}"
+ + "dataTime is {}", inlongStreamId,
result.getRight().size(),
jobInstanceId, sourceFile, dataTime);
}
@@ -160,28 +160,28 @@ public class ProxySink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
maxBatchSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
- maxQueueNumber = jobConf.getInt(PROXY_TID_QUEUE_MAX_NUMBER,
- DEFAULT_PROXY_TID_QUEUE_MAX_NUMBER);
+ maxQueueNumber =
jobConf.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
+ DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
maxBatchTimeoutMs = jobConf.getInt(
PROXY_PACKAGE_MAX_TIMEOUT_MS,
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
cache = new ConcurrentHashMap<>(10);
- bid = jobConf.get(PROXY_BID);
+ inlongStreamId = jobConf.get(PROXY_INLONG_GROUP_ID);
dataTime =
AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
jobConf.get(JOB_CYCLE_UNIT, ""));
- bid = jobConf.get(PROXY_BID);
- tid = jobConf.get(PROXY_TID, "");
+ inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);
+ inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, "");
messageFilter = initMessageFilter(jobConf);
fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER,
DEFAULT_FIELD_SPLITTER).getBytes(
StandardCharsets.UTF_8);
executorService.execute(flushCache());
- senderManager = new SenderManager(jobConf, bid, sourceFile);
+ senderManager = new SenderManager(jobConf, inlongStreamId, sourceFile);
try {
senderManager.addMessageSender();
} catch (Exception ex) {
- LOGGER.error("error while init sender for bid {}", bid);
+ LOGGER.error("error while init sender for group id {}",
inlongStreamId);
throw new IllegalStateException(ex);
}
}
@@ -214,7 +214,7 @@ public class ProxySink extends AbstractSink {
}
/**
- * check whether all tid messages finished
+ * check whether all stream id messages finished
* @return
*/
private boolean sinkFinish() {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 8f2b1dd..ad652b1 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -51,7 +51,7 @@ public class SenderManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(SenderManager.class);
private static final SequentialID SEQUENTIAL_ID =
SequentialID.getInstance();
private static final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
- // cache for bid and sender list, share the map cross agent lifecycle.
+ // cache for group and sender list, share the map cross agent lifecycle.
private static final ConcurrentHashMap<String, List<DefaultMessageSender>>
SENDER_MAP =
new ConcurrentHashMap<>();
@@ -79,13 +79,13 @@ public class SenderManager {
private final long maxSenderTimeout;
private final int maxSenderRetry;
private final long retrySleepTime;
- private final String bid;
+ private final String inlongGroupId;
private TaskPositionManager taskPositionManager;
- private final int maxSenderPerBid;
+ private final int maxSenderPerGroup;
private final String sourceFilePath;
private final PluginMetric metric = new PluginMetric();
- public SenderManager(JobProfile jobConf, String bid, String
sourceFilePath) {
+ public SenderManager(JobProfile jobConf, String inlongGroupId, String
sourceFilePath) {
AgentConfiguration conf = AgentConfiguration.getAgentConf();
managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
@@ -101,8 +101,8 @@ public class SenderManager {
CommonConstants.PROXY_ALIVE_CONNECTION_NUM,
CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
isCompress = jobConf.getBoolean(
CommonConstants.PROXY_IS_COMPRESS,
CommonConstants.DEFAULT_PROXY_IS_COMPRESS);
- maxSenderPerBid = jobConf.getInt(
- CommonConstants.PROXY_MAX_SENDER_PER_BID,
CommonConstants.DEFAULT_PROXY_MAX_SENDER_PER_PID);
+ maxSenderPerGroup = jobConf.getInt(
+ CommonConstants.PROXY_MAX_SENDER_PER_GROUP,
CommonConstants.DEFAULT_PROXY_MAX_SENDER_PER_GROUP);
msgType = jobConf.getInt(CommonConstants.PROXY_MSG_TYPE,
CommonConstants.DEFAULT_PROXY_MSG_TYPE);
maxSenderTimeout = jobConf.getInt(
CommonConstants.PROXY_SENDER_MAX_TIMEOUT,
CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT);
@@ -113,30 +113,30 @@ public class SenderManager {
isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE,
CommonConstants.DEFAULT_IS_FILE);
taskPositionManager = TaskPositionManager.getTaskPositionManager();
this.sourceFilePath = sourceFilePath;
- this.bid = bid;
+ this.inlongGroupId = inlongGroupId;
}
/**
- * Select by bid.
+ * Select by group.
*
- * @param bid - business id
+ * @param group - inlong group id
* @return default message sender
*/
- private DefaultMessageSender selectSender(String bid) {
- List<DefaultMessageSender> senderList = SENDER_MAP.get(bid);
+ private DefaultMessageSender selectSender(String group) {
+ List<DefaultMessageSender> senderList = SENDER_MAP.get(group);
return senderList.get((SENDER_INDEX.getAndIncrement() & 0x7FFFFFFF) %
senderList.size());
}
/**
* sender
*
- * @param bid - business id
+ * @param groupId - group id
* @return DefaultMessageSender
*/
- private DefaultMessageSender createMessageSender(String bid) throws
Exception {
+ private DefaultMessageSender createMessageSender(String groupId) throws
Exception {
ProxyClientConfig proxyClientConfig = new ProxyClientConfig(
- localhost, isLocalVisit, managerHost, managerPort, bid,
netTag);
+ localhost, isLocalVisit, managerHost, managerPort, groupId,
netTag);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setFile(isFile);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
@@ -148,19 +148,19 @@ public class SenderManager {
}
/**
- * Add new sender for bid if max size is not satisfied.
+ * Add new sender for group id if max size is not satisfied.
*
*/
public void addMessageSender() throws Exception {
List<DefaultMessageSender> tmpList = new ArrayList<>();
- List<DefaultMessageSender> senderList = SENDER_MAP.putIfAbsent(bid,
tmpList);
+ List<DefaultMessageSender> senderList =
SENDER_MAP.putIfAbsent(inlongGroupId, tmpList);
if (senderList == null) {
senderList = tmpList;
}
- if (senderList.size() > maxSenderPerBid) {
+ if (senderList.size() > maxSenderPerGroup) {
return;
}
- DefaultMessageSender sender = createMessageSender(bid);
+ DefaultMessageSender sender = createMessageSender(inlongGroupId);
senderList.add(sender);
}
@@ -169,17 +169,17 @@ public class SenderManager {
*/
private class AgentSenderCallback implements SendMessageCallback {
private final int retry;
- private final String bid;
+ private final String groupId;
private final List<byte[]> bodyList;
- private final String tid;
+ private final String streamId;
private final long dataTime;
private final String jobId;
- AgentSenderCallback(String jobId, String bid, String tid, List<byte[]>
bodyList, int retry,
+ AgentSenderCallback(String jobId, String groupId, String streamId,
List<byte[]> bodyList, int retry,
long dataTime) {
this.retry = retry;
- this.bid = bid;
- this.tid = tid;
+ this.groupId = groupId;
+ this.streamId = streamId;
this.bodyList = bodyList;
this.jobId = jobId;
this.dataTime = dataTime;
@@ -189,9 +189,9 @@ public class SenderManager {
public void onMessageAck(SendResult result) {
// if send result is not ok, retry again.
if (result == null || !result.equals(SendResult.OK)) {
- LOGGER.warn("send bid {}, tid {}, jobId {}, dataTime {} fail
with times {}",
- bid, tid, jobId, dataTime, retry);
- sendBatch(jobId, bid, tid, bodyList, retry + 1, dataTime);
+ LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime
{} fail with times {}",
+ groupId, streamId, jobId, dataTime, retry);
+ sendBatch(jobId, groupId, streamId, bodyList, retry + 1,
dataTime);
return;
}
metric.sendSuccessNum.incr(bodyList.size());
@@ -207,21 +207,21 @@ public class SenderManager {
/**
* Send message to proxy by batch, use message cache.
*
- * @param bid - bid
- * @param tid - tid
+ * @param groupId - groupId
+ * @param streamId - streamId
* @param bodyList - body list
* @param retry - retry time
*/
- public void sendBatch(String jobId, String bid, String tid,
+ public void sendBatch(String jobId, String groupId, String streamId,
List<byte[]> bodyList, int retry, long dataTime) {
if (retry > maxSenderRetry) {
LOGGER.warn("max retry reached, retry count is {}, sleep and send
again", retry);
AgentUtils.silenceSleepInMs(retrySleepTime);
}
try {
- selectSender(bid).asyncSendMessage(
- new AgentSenderCallback(jobId, bid, tid, bodyList, retry,
dataTime),
- bodyList, bid, tid,
+ selectSender(groupId).asyncSendMessage(
+ new AgentSenderCallback(jobId, groupId, streamId,
bodyList, retry, dataTime),
+ bodyList, groupId, streamId,
dataTime,
SEQUENTIAL_ID.getNextUuid(),
maxSenderTimeout,
@@ -232,7 +232,7 @@ public class SenderManager {
// retry time
try {
TimeUnit.SECONDS.sleep(1);
- sendBatch(jobId, bid, tid, bodyList, retry + 1, dataTime);
+ sendBatch(jobId, groupId, streamId, bodyList, retry + 1,
dataTime);
} catch (Exception ignored) {
// ignore it.
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index a970a99..ef84909 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -162,7 +162,7 @@ public class TestFileAgent {
}
@Test
- public void testTidFilter() throws Exception {
+ public void testGroupIdFilter() throws Exception {
String nowDate = AgentUtils.formatCurrentTimeWithoutOffset("yyyyMMdd");
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestTidFilter.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestStreamIdFilter.java
similarity index 87%
rename from
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestTidFilter.java
rename to
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestStreamIdFilter.java
index e3ac7bd..cf81adf 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestTidFilter.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestStreamIdFilter.java
@@ -33,7 +33,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-public class TestTidFilter {
+public class TestStreamIdFilter {
private static AgentBaseTestsHelper helper;
private static Path testPath;
@@ -50,12 +50,12 @@ public class TestTidFilter {
}
@Test
- public void testFilterTid() {
+ public void testStreamId() {
DefaultMessageFilter messageFilter = new DefaultMessageFilter();
- ProxyMessage proxyMessage = new ProxyMessage("tid|this is a line of
file".getBytes(
+ ProxyMessage proxyMessage = new ProxyMessage("streamId|this is a line
of file".getBytes(
StandardCharsets.UTF_8), new HashMap<>());
- String s = messageFilter.filterTid(proxyMessage,
"|".getBytes(StandardCharsets.UTF_8));
- Assert.assertEquals(s, "tid");
+ String s = messageFilter.filterStreamId(proxyMessage,
"|".getBytes(StandardCharsets.UTF_8));
+ Assert.assertEquals(s, "streamId");
}
@Test
@@ -66,7 +66,7 @@ public class TestTidFilter {
MessageFilter messageFilter = sinkTest.initMessageFilter(jobProfile);
ProxyMessage proxyMessage = new ProxyMessage("tid|this is a line of
file".getBytes(
StandardCharsets.UTF_8), new HashMap<>());
- String s = messageFilter.filterTid(proxyMessage,
"|".getBytes(StandardCharsets.UTF_8));
+ String s = messageFilter.filterStreamId(proxyMessage,
"|".getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(s, "tid");
}