This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d891797c9 [INLONG-9590][Agent] Delete positionManager, there is 
already an offsetManager (#9592)
8d891797c9 is described below

commit 8d891797c97a89606660cdeaae63fae9eafeab23
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Jan 18 16:50:40 2024 +0800

    [INLONG-9590][Agent] Delete positionManager, there is already an 
offsetManager (#9592)
    
    * [INLONG-9590][Agent] Delete positionManager, there is already an 
offsetManager
    
    * [INLONG-9590][Agent] Delete senderManager
---
 .../inlong/agent/core/task/PositionManager.java    | 170 -----------
 .../inlong/agent/plugin/sinks/KafkaSink.java       |   3 -
 .../inlong/agent/plugin/sinks/PulsarSink.java      |   3 -
 .../inlong/agent/plugin/sinks/SenderManager.java   | 327 ---------------------
 4 files changed, 503 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
deleted file mode 100644
index 08d585ac84..0000000000
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.core.task;
-
-import org.apache.inlong.agent.common.AbstractDaemon;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.core.AgentManager;
-import org.apache.inlong.agent.db.JobProfileDb;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FETCHER_INTERVAL;
-
-/**
- * used to store task position to db, task position is stored as properties in 
JobProfile.
- * where key is task read file name and value is task sink position
- * note that this class is generated
- */
-public class PositionManager extends AbstractDaemon {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(PositionManager.class);
-    private static volatile PositionManager positionManager = null;
-    private final AgentManager agentManager;
-    private JobProfileDb jobConfDb;
-    private final AgentConfiguration conf;
-    private ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> 
jobTaskPositionMap;
-
-    private PositionManager(AgentManager agentManager) {
-        this.conf = AgentConfiguration.getAgentConf();
-        this.agentManager = agentManager;
-        this.jobTaskPositionMap = new ConcurrentHashMap<>();
-    }
-
-    /**
-     * task position manager singleton, can only generated by agent manager
-     */
-    public static PositionManager getInstance(AgentManager agentManager) {
-        if (positionManager == null) {
-            synchronized (PositionManager.class) {
-                if (positionManager == null) {
-                    positionManager = new PositionManager(agentManager);
-                }
-            }
-        }
-        return positionManager;
-    }
-
-    /**
-     * get taskPositionManager singleton
-     */
-    public static PositionManager getInstance() {
-        if (positionManager == null) {
-            throw new RuntimeException("task position manager has not been 
initialized by agentManager");
-        }
-        return positionManager;
-    }
-
-    @Override
-    public void start() throws Exception {
-        submitWorker(taskPositionFlushThread());
-    }
-
-    private Runnable taskPositionFlushThread() {
-        return () -> {
-            while (isRunnable()) {
-                doFlush();
-            }
-        };
-    }
-
-    private void doFlush() {
-        try {
-            // check pending jobs and try to submit again.
-            for (String jobId : jobTaskPositionMap.keySet()) {
-                JobProfile jobProfile = jobConfDb.getJobById(jobId);
-                if (jobProfile == null) {
-                    LOGGER.warn("jobProfile {} cannot be found in db, "
-                            + "might be deleted by standalone mode, now delete 
job position in memory", jobId);
-                    deleteJobPosition(jobId);
-                    continue;
-                }
-                flushJobProfile(jobId, jobProfile);
-            }
-        } catch (Throwable ex) {
-            LOGGER.error("error caught", ex);
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
-        } finally {
-            int flushTime = conf.getInt(AGENT_HEARTBEAT_INTERVAL,
-                    DEFAULT_AGENT_FETCHER_INTERVAL);
-            AgentUtils.silenceSleepInSeconds(flushTime);
-        }
-    }
-
-    private void flushJobProfile(String jobId, JobProfile jobProfile) {
-        jobTaskPositionMap.get(jobId).forEach(
-                (fileName, position) -> jobProfile.setLong(fileName + 
POSITION_SUFFIX, position));
-        if (jobConfDb.checkJobfinished(jobProfile)) {
-            LOGGER.info("Cannot update job profile {}, delete memory job in 
jobTaskPosition", jobId);
-            deleteJobPosition(jobId);
-        } else {
-            jobConfDb.updateJobProfile(jobProfile);
-        }
-    }
-
-    private void deleteJobPosition(String jobId) {
-        jobTaskPositionMap.remove(jobId);
-    }
-
-    @Override
-    public void stop() throws Exception {
-        waitForTerminate();
-    }
-
-    /**
-     * update job sink position
-     *
-     * @param size add this size to beforePosition
-     */
-    public void updateSinkPosition(String jobInstanceId, String sourcePath, 
long size, boolean reset) {
-        ConcurrentHashMap<String, Long> positionTemp = new 
ConcurrentHashMap<>();
-        ConcurrentHashMap<String, Long> position = 
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
-        if (position == null) {
-            JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
-            positionTemp.put(sourcePath, jobProfile.getLong(sourcePath + 
POSITION_SUFFIX, 0));
-            position = positionTemp;
-        }
-
-        if (!reset) {
-            Long beforePosition = position.getOrDefault(sourcePath, 0L);
-            position.put(sourcePath, beforePosition + size);
-        } else {
-            position.put(sourcePath, size);
-        }
-    }
-
-    public long getPosition(String sourcePath, String jobInstanceId) {
-        JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
-        if (jobProfile == null) {
-            LOGGER.error("getPosition but jobProfile not exist! sourcePath {} 
jobInstanceId {} return position 0",
-                    sourcePath,
-                    jobInstanceId);
-            return 0;
-        }
-
-        return jobProfile.getLong(sourcePath + POSITION_SUFFIX, 0);
-    }
-}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
index 1e9029e334..722ccd6a9e 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.plugin.sinks;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.core.task.PositionManager;
 import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.message.EndMessage;
 import org.apache.inlong.agent.message.PackProxyMessage;
@@ -76,7 +75,6 @@ public class KafkaSink extends AbstractSink {
     private static final ExecutorService EXECUTOR_SERVICE = new 
ThreadPoolExecutor(0, Integer.MAX_VALUE,
             60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new 
AgentThreadFactory("KafkaSink"));
     private final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
-    private PositionManager taskPositionManager;
     private volatile boolean shutdown = false;
 
     private List<MQClusterInfo> mqClusterInfos;
@@ -92,7 +90,6 @@ public class KafkaSink extends AbstractSink {
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
-        taskPositionManager = PositionManager.getInstance();
         int sendQueueSize = agentConf.getInt(KAFKA_SINK_SEND_QUEUE_SIZE, 
DEFAULT_SEND_QUEUE_SIZE);
         kafkaSendQueue = new LinkedBlockingQueue<>(sendQueueSize);
         producerNum = agentConf.getInt(KAFKA_SINK_PRODUCER_NUM, 
DEFAULT_PRODUCER_NUM);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index 348cd1c4b7..1326f914a5 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.plugin.sinks;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.core.task.PositionManager;
 import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.message.EndMessage;
 import org.apache.inlong.agent.message.PackProxyMessage;
@@ -91,7 +90,6 @@ public class PulsarSink extends AbstractSink {
     private static final ExecutorService EXECUTOR_SERVICE = new 
ThreadPoolExecutor(0, Integer.MAX_VALUE,
             60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new 
AgentThreadFactory("PulsarSink"));
     private final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
-    private PositionManager positionManager;
     private volatile boolean shutdown = false;
     private List<MQClusterInfo> mqClusterInfos;
     private String topic;
@@ -119,7 +117,6 @@ public class PulsarSink extends AbstractSink {
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
-        positionManager = PositionManager.getInstance();
         // agentConf
         sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE, 
DEFAULT_SEND_QUEUE_SIZE);
         sendQueueSemaphore = new Semaphore(sendQueueSize);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
deleted file mode 100755
index ac0b19fcff..0000000000
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sinks;
-
-import org.apache.inlong.agent.common.AgentThreadFactory;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.core.task.PositionManager;
-import org.apache.inlong.agent.message.BatchProxyMessage;
-import org.apache.inlong.agent.metrics.AgentMetricItem;
-import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.agent.plugin.message.SequentialID;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.constant.ProtocolType;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
-import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_PROXY_SEND;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_PROXY_SEND;
-import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
-import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
-
-/**
- * proxy client
- */
-public class SenderManager {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SenderManager.class);
-    private static final SequentialID SEQUENTIAL_ID = 
SequentialID.getInstance();
-    private final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
-    // cache for group and sender list, share the map cross agent lifecycle.
-    private DefaultMessageSender sender;
-    private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
-    private final ExecutorService resendExecutorService = new 
ThreadPoolExecutor(1, 1,
-            0L, TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<>(), new 
AgentThreadFactory("SendManager-Resend"));
-    // sharing worker threads between sender client
-    // in case of thread abusing.
-    private ThreadFactory SHARED_FACTORY;
-    private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
-    private final String managerAddr;
-    private final int totalAsyncBufSize;
-    private final int aliveConnectionNum;
-    private final boolean isCompress;
-    private final int msgType;
-    private final boolean isFile;
-    private final long maxSenderTimeout;
-    private final int maxSenderRetry;
-    private final long retrySleepTime;
-    private final String inlongGroupId;
-    private final int maxSenderPerGroup;
-    private final String sourcePath;
-    private final boolean proxySend;
-    private volatile boolean shutdown = false;
-    // metric
-    private AgentMetricItemSet metricItemSet;
-    private Map<String, String> dimensions;
-    private PositionManager positionManager;
-    private int ioThreadNum;
-    private boolean enableBusyWait;
-    private String authSecretId;
-    private String authSecretKey;
-    protected int batchFlushInterval;
-
-    public SenderManager(JobProfile jobConf, String inlongGroupId, String 
sourcePath) {
-        AgentConfiguration conf = AgentConfiguration.getAgentConf();
-        managerAddr = conf.get(AGENT_MANAGER_ADDR);
-        proxySend = jobConf.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND);
-        totalAsyncBufSize = jobConf
-                .getInt(
-                        CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
-                        CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
-        aliveConnectionNum = jobConf
-                .getInt(
-                        CommonConstants.PROXY_ALIVE_CONNECTION_NUM, 
CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
-        isCompress = jobConf.getBoolean(
-                CommonConstants.PROXY_IS_COMPRESS, 
CommonConstants.DEFAULT_PROXY_IS_COMPRESS);
-        maxSenderPerGroup = jobConf.getInt(
-                CommonConstants.PROXY_MAX_SENDER_PER_GROUP, 
CommonConstants.DEFAULT_PROXY_MAX_SENDER_PER_GROUP);
-        msgType = jobConf.getInt(CommonConstants.PROXY_MSG_TYPE, 
CommonConstants.DEFAULT_PROXY_MSG_TYPE);
-        maxSenderTimeout = jobConf.getInt(
-                CommonConstants.PROXY_SENDER_MAX_TIMEOUT, 
CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT);
-        maxSenderRetry = jobConf.getInt(
-                CommonConstants.PROXY_SENDER_MAX_RETRY, 
CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
-        retrySleepTime = jobConf.getLong(
-                CommonConstants.PROXY_RETRY_SLEEP, 
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
-        isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE, 
CommonConstants.DEFAULT_IS_FILE);
-        positionManager = PositionManager.getInstance();
-        ioThreadNum = 
jobConf.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
-                CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
-        enableBusyWait = 
jobConf.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
-                CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT);
-        batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL, 
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
-        authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
-        authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
-
-        this.sourcePath = sourcePath;
-        this.inlongGroupId = inlongGroupId;
-
-        this.dimensions = new HashMap<>();
-        dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
-        String metricName = String.join("-", this.getClass().getSimpleName(),
-                String.valueOf(METRIC_INDEX.incrementAndGet()));
-        this.metricItemSet = new AgentMetricItemSet(metricName);
-        MetricRegister.register(metricItemSet);
-        resendQueue = new LinkedBlockingQueue<>();
-    }
-
-    public void Start() throws Exception {
-        sender = createMessageSender(inlongGroupId);
-        resendExecutorService.execute(flushResendQueue());
-    }
-
-    public void Stop() {
-        shutdown = true;
-        resendExecutorService.shutdown();
-        sender.close();
-        cleanResendQueue();
-    }
-
-    private void cleanResendQueue() {
-        while (!resendQueue.isEmpty()) {
-            try {
-                AgentSenderCallback callback = resendQueue.poll(1, 
TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                LOGGER.error("clean resend queue error{}", e.getMessage());
-            }
-        }
-    }
-
-    private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) 
{
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
-        dimensions.putAll(otherDimensions);
-        return this.metricItemSet.findMetricItem(dimensions);
-    }
-
-    private AgentMetricItem getMetricItem(String groupId, String streamId) {
-        Map<String, String> dims = new HashMap<>();
-        dims.put(KEY_INLONG_GROUP_ID, groupId);
-        dims.put(KEY_INLONG_STREAM_ID, streamId);
-        return getMetricItem(dims);
-    }
-
-    /**
-     * sender
-     *
-     * @param tagName group id
-     * @return DefaultMessageSender
-     */
-    private DefaultMessageSender createMessageSender(String tagName) throws 
Exception {
-        ProxyClientConfig proxyClientConfig = new 
ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
-                authSecretKey);
-        proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
-        proxyClientConfig.setFile(isFile);
-        proxyClientConfig.setAliveConnections(aliveConnectionNum);
-
-        proxyClientConfig.setIoThreadNum(ioThreadNum);
-        proxyClientConfig.setEnableBusyWait(enableBusyWait);
-        proxyClientConfig.setProtocolType(ProtocolType.TCP);
-
-        SHARED_FACTORY = new DefaultThreadFactory("agent-client-" + sourcePath,
-                Thread.currentThread().isDaemon());
-
-        DefaultMessageSender sender = new 
DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
-        sender.setMsgtype(msgType);
-        sender.setCompress(isCompress);
-        return sender;
-    }
-
-    public void sendBatch(BatchProxyMessage batchMessage) {
-        sendBatchWithRetryCount(batchMessage, 0);
-    }
-
-    /**
-     * Send message to proxy by batch, use message cache.
-     */
-    private void sendBatchWithRetryCount(BatchProxyMessage batchMessage, int 
retry) {
-        boolean suc = false;
-        while (!suc) {
-            try {
-                if (!resendQueue.isEmpty()) {
-                    AgentUtils.silenceSleepInMs(retrySleepTime);
-                    continue;
-                }
-                sender.asyncSendMessage(new AgentSenderCallback(batchMessage, 
retry),
-                        batchMessage.getDataList(), batchMessage.getGroupId(), 
batchMessage.getStreamId(),
-                        batchMessage.getDataTime(), 
SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS,
-                        batchMessage.getExtraMap(), proxySend);
-                getMetricItem(batchMessage.getGroupId(), 
batchMessage.getStreamId()).pluginSendCount.addAndGet(
-                        batchMessage.getMsgCnt());
-                suc = true;
-            } catch (Exception exception) {
-                suc = false;
-                if (retry > maxSenderRetry) {
-                    if (retry % 10 == 0) {
-                        LOGGER.error("max retry reached, sample log Exception 
caught", exception);
-                    }
-                } else {
-                    LOGGER.error("Exception caught", exception);
-                }
-                retry++;
-                AgentUtils.silenceSleepInMs(retrySleepTime);
-            }
-        }
-    }
-
-    /**
-     * flushResendQueue
-     *
-     * @return thread runner
-     */
-    private Runnable flushResendQueue() {
-        return () -> {
-            LOGGER.info("start flush resend queue {}:{}", inlongGroupId, 
sourcePath);
-            while (!shutdown) {
-                try {
-                    AgentSenderCallback callback = resendQueue.poll(1, 
TimeUnit.SECONDS);
-                    if (callback != null) {
-                        sendBatchWithRetryCount(callback.batchMessage, 
callback.retry + 1);
-                    }
-                } catch (Exception ex) {
-                    LOGGER.error("error caught", ex);
-                } catch (Throwable t) {
-                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
t);
-                } finally {
-                    AgentUtils.silenceSleepInMs(batchFlushInterval);
-                }
-            }
-            LOGGER.info("stop flush resend queue {}:{}", inlongGroupId, 
sourcePath);
-        };
-    }
-
-    /**
-     * put the data into resend queue and will be resent later.
-     *
-     * @param batchMessageCallBack
-     */
-    private void putInResendQueue(AgentSenderCallback batchMessageCallBack) {
-        try {
-            resendQueue.put(batchMessageCallBack);
-        } catch (Throwable throwable) {
-            LOGGER.error("putInResendQueue e = {}", throwable);
-        }
-    }
-
-    /**
-     * sender callback
-     */
-    private class AgentSenderCallback implements SendMessageCallback {
-
-        private final int retry;
-        private final BatchProxyMessage batchMessage;
-        private final int msgCnt;
-
-        AgentSenderCallback(BatchProxyMessage batchMessage, int retry) {
-            this.batchMessage = batchMessage;
-            this.retry = retry;
-            this.msgCnt = batchMessage.getDataList().size();
-        }
-
-        @Override
-        public void onMessageAck(SendResult result) {
-            String groupId = batchMessage.getGroupId();
-            String streamId = batchMessage.getStreamId();
-            String jobId = batchMessage.getJobId();
-            long dataTime = batchMessage.getDataTime();
-            if (result != null && result.equals(SendResult.OK)) {
-                getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
-                PositionManager.getInstance()
-                        .updateSinkPosition(batchMessage.getJobId(), 
sourcePath, msgCnt, false);
-            } else {
-                LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime 
{} fail with times {}, "
-                        + "error {}", groupId, streamId, jobId, dataTime, 
retry, result);
-                getMetricItem(groupId, 
streamId).pluginSendFailCount.addAndGet(msgCnt);
-                putInResendQueue(new AgentSenderCallback(batchMessage, retry));
-            }
-        }
-
-        @Override
-        public void onException(Throwable e) {
-            getMetricItem(batchMessage.getGroupId(), 
batchMessage.getStreamId()).pluginSendFailCount.addAndGet(msgCnt);
-            LOGGER.error("exception caught", e);
-        }
-    }
-
-}

Reply via email to