This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8d891797c9 [INLONG-9590][Agent] Delete positionManager, there is
already an offsetManager (#9592)
8d891797c9 is described below
commit 8d891797c97a89606660cdeaae63fae9eafeab23
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Jan 18 16:50:40 2024 +0800
[INLONG-9590][Agent] Delete positionManager, there is already an
offsetManager (#9592)
* [INLONG-9590][Agent] Delete positionManager, there is already an
offsetManager
* [INLONG-9590][Agent] Delete senderManager
---
.../inlong/agent/core/task/PositionManager.java | 170 -----------
.../inlong/agent/plugin/sinks/KafkaSink.java | 3 -
.../inlong/agent/plugin/sinks/PulsarSink.java | 3 -
.../inlong/agent/plugin/sinks/SenderManager.java | 327 ---------------------
4 files changed, 503 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
deleted file mode 100644
index 08d585ac84..0000000000
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.core.task;
-
-import org.apache.inlong.agent.common.AbstractDaemon;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.core.AgentManager;
-import org.apache.inlong.agent.db.JobProfileDb;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
-import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FETCHER_INTERVAL;
-
-/**
- * used to store task position to db, task position is stored as properties in
JobProfile.
- * where key is task read file name and value is task sink position
- * note that this class is generated
- */
-public class PositionManager extends AbstractDaemon {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PositionManager.class);
- private static volatile PositionManager positionManager = null;
- private final AgentManager agentManager;
- private JobProfileDb jobConfDb;
- private final AgentConfiguration conf;
- private ConcurrentHashMap<String, ConcurrentHashMap<String, Long>>
jobTaskPositionMap;
-
- private PositionManager(AgentManager agentManager) {
- this.conf = AgentConfiguration.getAgentConf();
- this.agentManager = agentManager;
- this.jobTaskPositionMap = new ConcurrentHashMap<>();
- }
-
- /**
- * task position manager singleton, can only generated by agent manager
- */
- public static PositionManager getInstance(AgentManager agentManager) {
- if (positionManager == null) {
- synchronized (PositionManager.class) {
- if (positionManager == null) {
- positionManager = new PositionManager(agentManager);
- }
- }
- }
- return positionManager;
- }
-
- /**
- * get taskPositionManager singleton
- */
- public static PositionManager getInstance() {
- if (positionManager == null) {
- throw new RuntimeException("task position manager has not been
initialized by agentManager");
- }
- return positionManager;
- }
-
- @Override
- public void start() throws Exception {
- submitWorker(taskPositionFlushThread());
- }
-
- private Runnable taskPositionFlushThread() {
- return () -> {
- while (isRunnable()) {
- doFlush();
- }
- };
- }
-
- private void doFlush() {
- try {
- // check pending jobs and try to submit again.
- for (String jobId : jobTaskPositionMap.keySet()) {
- JobProfile jobProfile = jobConfDb.getJobById(jobId);
- if (jobProfile == null) {
- LOGGER.warn("jobProfile {} cannot be found in db, "
- + "might be deleted by standalone mode, now delete
job position in memory", jobId);
- deleteJobPosition(jobId);
- continue;
- }
- flushJobProfile(jobId, jobProfile);
- }
- } catch (Throwable ex) {
- LOGGER.error("error caught", ex);
- ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
- } finally {
- int flushTime = conf.getInt(AGENT_HEARTBEAT_INTERVAL,
- DEFAULT_AGENT_FETCHER_INTERVAL);
- AgentUtils.silenceSleepInSeconds(flushTime);
- }
- }
-
- private void flushJobProfile(String jobId, JobProfile jobProfile) {
- jobTaskPositionMap.get(jobId).forEach(
- (fileName, position) -> jobProfile.setLong(fileName +
POSITION_SUFFIX, position));
- if (jobConfDb.checkJobfinished(jobProfile)) {
- LOGGER.info("Cannot update job profile {}, delete memory job in
jobTaskPosition", jobId);
- deleteJobPosition(jobId);
- } else {
- jobConfDb.updateJobProfile(jobProfile);
- }
- }
-
- private void deleteJobPosition(String jobId) {
- jobTaskPositionMap.remove(jobId);
- }
-
- @Override
- public void stop() throws Exception {
- waitForTerminate();
- }
-
- /**
- * update job sink position
- *
- * @param size add this size to beforePosition
- */
- public void updateSinkPosition(String jobInstanceId, String sourcePath,
long size, boolean reset) {
- ConcurrentHashMap<String, Long> positionTemp = new
ConcurrentHashMap<>();
- ConcurrentHashMap<String, Long> position =
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
- if (position == null) {
- JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
- positionTemp.put(sourcePath, jobProfile.getLong(sourcePath +
POSITION_SUFFIX, 0));
- position = positionTemp;
- }
-
- if (!reset) {
- Long beforePosition = position.getOrDefault(sourcePath, 0L);
- position.put(sourcePath, beforePosition + size);
- } else {
- position.put(sourcePath, size);
- }
- }
-
- public long getPosition(String sourcePath, String jobInstanceId) {
- JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
- if (jobProfile == null) {
- LOGGER.error("getPosition but jobProfile not exist! sourcePath {}
jobInstanceId {} return position 0",
- sourcePath,
- jobInstanceId);
- return 0;
- }
-
- return jobProfile.getLong(sourcePath + POSITION_SUFFIX, 0);
- }
-}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
index 1e9029e334..722ccd6a9e 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.plugin.sinks;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
@@ -76,7 +75,6 @@ public class KafkaSink extends AbstractSink {
private static final ExecutorService EXECUTOR_SERVICE = new
ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new
AgentThreadFactory("KafkaSink"));
private final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
- private PositionManager taskPositionManager;
private volatile boolean shutdown = false;
private List<MQClusterInfo> mqClusterInfos;
@@ -92,7 +90,6 @@ public class KafkaSink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
- taskPositionManager = PositionManager.getInstance();
int sendQueueSize = agentConf.getInt(KAFKA_SINK_SEND_QUEUE_SIZE,
DEFAULT_SEND_QUEUE_SIZE);
kafkaSendQueue = new LinkedBlockingQueue<>(sendQueueSize);
producerNum = agentConf.getInt(KAFKA_SINK_PRODUCER_NUM,
DEFAULT_PRODUCER_NUM);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index 348cd1c4b7..1326f914a5 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.plugin.sinks;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
@@ -91,7 +90,6 @@ public class PulsarSink extends AbstractSink {
private static final ExecutorService EXECUTOR_SERVICE = new
ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new
AgentThreadFactory("PulsarSink"));
private final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
- private PositionManager positionManager;
private volatile boolean shutdown = false;
private List<MQClusterInfo> mqClusterInfos;
private String topic;
@@ -119,7 +117,6 @@ public class PulsarSink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
- positionManager = PositionManager.getInstance();
// agentConf
sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE,
DEFAULT_SEND_QUEUE_SIZE);
sendQueueSemaphore = new Semaphore(sendQueueSize);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
deleted file mode 100755
index ac0b19fcff..0000000000
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sinks;
-
-import org.apache.inlong.agent.common.AgentThreadFactory;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.core.task.PositionManager;
-import org.apache.inlong.agent.message.BatchProxyMessage;
-import org.apache.inlong.agent.metrics.AgentMetricItem;
-import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.agent.plugin.message.SequentialID;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.constant.ProtocolType;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
-import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_PROXY_SEND;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_PROXY_SEND;
-import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
-import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
-
-/**
- * proxy client
- */
-public class SenderManager {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(SenderManager.class);
- private static final SequentialID SEQUENTIAL_ID =
SequentialID.getInstance();
- private final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
- // cache for group and sender list, share the map cross agent lifecycle.
- private DefaultMessageSender sender;
- private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
- private final ExecutorService resendExecutorService = new
ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(), new
AgentThreadFactory("SendManager-Resend"));
- // sharing worker threads between sender client
- // in case of thread abusing.
- private ThreadFactory SHARED_FACTORY;
- private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
- private final String managerAddr;
- private final int totalAsyncBufSize;
- private final int aliveConnectionNum;
- private final boolean isCompress;
- private final int msgType;
- private final boolean isFile;
- private final long maxSenderTimeout;
- private final int maxSenderRetry;
- private final long retrySleepTime;
- private final String inlongGroupId;
- private final int maxSenderPerGroup;
- private final String sourcePath;
- private final boolean proxySend;
- private volatile boolean shutdown = false;
- // metric
- private AgentMetricItemSet metricItemSet;
- private Map<String, String> dimensions;
- private PositionManager positionManager;
- private int ioThreadNum;
- private boolean enableBusyWait;
- private String authSecretId;
- private String authSecretKey;
- protected int batchFlushInterval;
-
- public SenderManager(JobProfile jobConf, String inlongGroupId, String
sourcePath) {
- AgentConfiguration conf = AgentConfiguration.getAgentConf();
- managerAddr = conf.get(AGENT_MANAGER_ADDR);
- proxySend = jobConf.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND);
- totalAsyncBufSize = jobConf
- .getInt(
- CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
- CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
- aliveConnectionNum = jobConf
- .getInt(
- CommonConstants.PROXY_ALIVE_CONNECTION_NUM,
CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
- isCompress = jobConf.getBoolean(
- CommonConstants.PROXY_IS_COMPRESS,
CommonConstants.DEFAULT_PROXY_IS_COMPRESS);
- maxSenderPerGroup = jobConf.getInt(
- CommonConstants.PROXY_MAX_SENDER_PER_GROUP,
CommonConstants.DEFAULT_PROXY_MAX_SENDER_PER_GROUP);
- msgType = jobConf.getInt(CommonConstants.PROXY_MSG_TYPE,
CommonConstants.DEFAULT_PROXY_MSG_TYPE);
- maxSenderTimeout = jobConf.getInt(
- CommonConstants.PROXY_SENDER_MAX_TIMEOUT,
CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT);
- maxSenderRetry = jobConf.getInt(
- CommonConstants.PROXY_SENDER_MAX_RETRY,
CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
- retrySleepTime = jobConf.getLong(
- CommonConstants.PROXY_RETRY_SLEEP,
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
- isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE,
CommonConstants.DEFAULT_IS_FILE);
- positionManager = PositionManager.getInstance();
- ioThreadNum =
jobConf.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
- CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
- enableBusyWait =
jobConf.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
- CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT);
- batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
- authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
- authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
-
- this.sourcePath = sourcePath;
- this.inlongGroupId = inlongGroupId;
-
- this.dimensions = new HashMap<>();
- dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
- String metricName = String.join("-", this.getClass().getSimpleName(),
- String.valueOf(METRIC_INDEX.incrementAndGet()));
- this.metricItemSet = new AgentMetricItemSet(metricName);
- MetricRegister.register(metricItemSet);
- resendQueue = new LinkedBlockingQueue<>();
- }
-
- public void Start() throws Exception {
- sender = createMessageSender(inlongGroupId);
- resendExecutorService.execute(flushResendQueue());
- }
-
- public void Stop() {
- shutdown = true;
- resendExecutorService.shutdown();
- sender.close();
- cleanResendQueue();
- }
-
- private void cleanResendQueue() {
- while (!resendQueue.isEmpty()) {
- try {
- AgentSenderCallback callback = resendQueue.poll(1,
TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.error("clean resend queue error{}", e.getMessage());
- }
- }
- }
-
- private AgentMetricItem getMetricItem(Map<String, String> otherDimensions)
{
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
- dimensions.putAll(otherDimensions);
- return this.metricItemSet.findMetricItem(dimensions);
- }
-
- private AgentMetricItem getMetricItem(String groupId, String streamId) {
- Map<String, String> dims = new HashMap<>();
- dims.put(KEY_INLONG_GROUP_ID, groupId);
- dims.put(KEY_INLONG_STREAM_ID, streamId);
- return getMetricItem(dims);
- }
-
- /**
- * sender
- *
- * @param tagName group id
- * @return DefaultMessageSender
- */
- private DefaultMessageSender createMessageSender(String tagName) throws
Exception {
- ProxyClientConfig proxyClientConfig = new
ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
- authSecretKey);
- proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
- proxyClientConfig.setFile(isFile);
- proxyClientConfig.setAliveConnections(aliveConnectionNum);
-
- proxyClientConfig.setIoThreadNum(ioThreadNum);
- proxyClientConfig.setEnableBusyWait(enableBusyWait);
- proxyClientConfig.setProtocolType(ProtocolType.TCP);
-
- SHARED_FACTORY = new DefaultThreadFactory("agent-client-" + sourcePath,
- Thread.currentThread().isDaemon());
-
- DefaultMessageSender sender = new
DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
- sender.setMsgtype(msgType);
- sender.setCompress(isCompress);
- return sender;
- }
-
- public void sendBatch(BatchProxyMessage batchMessage) {
- sendBatchWithRetryCount(batchMessage, 0);
- }
-
- /**
- * Send message to proxy by batch, use message cache.
- */
- private void sendBatchWithRetryCount(BatchProxyMessage batchMessage, int
retry) {
- boolean suc = false;
- while (!suc) {
- try {
- if (!resendQueue.isEmpty()) {
- AgentUtils.silenceSleepInMs(retrySleepTime);
- continue;
- }
- sender.asyncSendMessage(new AgentSenderCallback(batchMessage,
retry),
- batchMessage.getDataList(), batchMessage.getGroupId(),
batchMessage.getStreamId(),
- batchMessage.getDataTime(),
SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS,
- batchMessage.getExtraMap(), proxySend);
- getMetricItem(batchMessage.getGroupId(),
batchMessage.getStreamId()).pluginSendCount.addAndGet(
- batchMessage.getMsgCnt());
- suc = true;
- } catch (Exception exception) {
- suc = false;
- if (retry > maxSenderRetry) {
- if (retry % 10 == 0) {
- LOGGER.error("max retry reached, sample log Exception
caught", exception);
- }
- } else {
- LOGGER.error("Exception caught", exception);
- }
- retry++;
- AgentUtils.silenceSleepInMs(retrySleepTime);
- }
- }
- }
-
- /**
- * flushResendQueue
- *
- * @return thread runner
- */
- private Runnable flushResendQueue() {
- return () -> {
- LOGGER.info("start flush resend queue {}:{}", inlongGroupId,
sourcePath);
- while (!shutdown) {
- try {
- AgentSenderCallback callback = resendQueue.poll(1,
TimeUnit.SECONDS);
- if (callback != null) {
- sendBatchWithRetryCount(callback.batchMessage,
callback.retry + 1);
- }
- } catch (Exception ex) {
- LOGGER.error("error caught", ex);
- } catch (Throwable t) {
- ThreadUtils.threadThrowableHandler(Thread.currentThread(),
t);
- } finally {
- AgentUtils.silenceSleepInMs(batchFlushInterval);
- }
- }
- LOGGER.info("stop flush resend queue {}:{}", inlongGroupId,
sourcePath);
- };
- }
-
- /**
- * put the data into resend queue and will be resent later.
- *
- * @param batchMessageCallBack
- */
- private void putInResendQueue(AgentSenderCallback batchMessageCallBack) {
- try {
- resendQueue.put(batchMessageCallBack);
- } catch (Throwable throwable) {
- LOGGER.error("putInResendQueue e = {}", throwable);
- }
- }
-
- /**
- * sender callback
- */
- private class AgentSenderCallback implements SendMessageCallback {
-
- private final int retry;
- private final BatchProxyMessage batchMessage;
- private final int msgCnt;
-
- AgentSenderCallback(BatchProxyMessage batchMessage, int retry) {
- this.batchMessage = batchMessage;
- this.retry = retry;
- this.msgCnt = batchMessage.getDataList().size();
- }
-
- @Override
- public void onMessageAck(SendResult result) {
- String groupId = batchMessage.getGroupId();
- String streamId = batchMessage.getStreamId();
- String jobId = batchMessage.getJobId();
- long dataTime = batchMessage.getDataTime();
- if (result != null && result.equals(SendResult.OK)) {
- getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
- PositionManager.getInstance()
- .updateSinkPosition(batchMessage.getJobId(),
sourcePath, msgCnt, false);
- } else {
- LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime
{} fail with times {}, "
- + "error {}", groupId, streamId, jobId, dataTime,
retry, result);
- getMetricItem(groupId,
streamId).pluginSendFailCount.addAndGet(msgCnt);
- putInResendQueue(new AgentSenderCallback(batchMessage, retry));
- }
- }
-
- @Override
- public void onException(Throwable e) {
- getMetricItem(batchMessage.getGroupId(),
batchMessage.getStreamId()).pluginSendFailCount.addAndGet(msgCnt);
- LOGGER.error("exception caught", e);
- }
- }
-
-}