This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e8892dd [INLONG-2023][Bug] Agent stream id is not passed to proxy
(#2025)
e8892dd is described below
commit e8892ddb8fea2d88e691aedebf363128455eadb4
Author: ziruipeng <[email protected]>
AuthorDate: Fri Dec 17 19:38:01 2021 +0800
[INLONG-2023][Bug] Agent stream id is not passed to proxy (#2025)
---
.../java/org/apache/inlong/agent/plugin/sinks/ProxySink.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 9f94523..6ec9906 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -108,7 +108,8 @@ public class ProxySink extends AbstractSink {
@Override
public void write(Message message) {
if (message != null) {
- message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID,
inlongStreamId);
+ message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID,
inlongGroupId);
+ message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
inlongStreamId);
extractStreamFromMessage(message, fieldSplitter);
if (!(message instanceof EndMessage)) {
ProxyMessage proxyMessage = ProxyMessage.parse(message);
@@ -160,16 +161,16 @@ public class ProxySink extends AbstractSink {
*/
private Runnable flushCache() {
return () -> {
- LOGGER.info("start flush cache thread for {} ProxySink",
inlongStreamId);
+ LOGGER.info("start flush cache thread for {} ProxySink",
inlongGroupId);
while (!shutdown) {
try {
cache.forEach((s, packProxyMessage) -> {
Pair<String, List<byte[]>> result =
packProxyMessage.fetchBatch();
if (result != null) {
- senderManager.sendBatch(jobInstanceId,
inlongStreamId, result.getKey(),
+ senderManager.sendBatch(jobInstanceId,
inlongGroupId, result.getKey(),
result.getValue(), 0, dataTime);
LOGGER.info("send group id {} with message size
{}, the job id is {}, read file is {}"
- + "dataTime is {}", inlongStreamId,
result.getRight().size(),
+ + "dataTime is {}", inlongGroupId,
result.getRight().size(),
jobInstanceId, sourceFile, dataTime);
}
@@ -193,7 +194,6 @@ public class ProxySink extends AbstractSink {
batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
cache = new ConcurrentHashMap<>(10);
- inlongStreamId = jobConf.get(PROXY_INLONG_GROUP_ID);
dataTime =
AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
jobConf.get(JOB_CYCLE_UNIT, ""));
inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);