This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 82cc76e41a [INLONG-11752][Agent] Modify the default collection range
of data (#11753)
82cc76e41a is described below
commit 82cc76e41af429b316241d5dafb0e1474bd054cc
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Feb 13 19:53:48 2025 +0800
[INLONG-11752][Agent] Modify the default collection range of data (#11753)
---
.../inlong/agent/constant/AgentConstants.java | 6 +++++
.../agent/core/instance/InstanceManager.java | 14 +++++-----
.../inlong/agent/core/task/OffsetManager.java | 31 +++++++++++++++++++---
.../inlong/agent/plugin/sinks/ProxySink.java | 11 ++++----
.../SenderManager.java => Sender.java} | 8 +++---
.../plugin/task/logcollection/LogAbstractTask.java | 10 ++++---
.../inlong/agent/plugin/utils/regex/Scanner.java | 7 +++--
.../inlong/agent/plugin/sinks/KafkaSinkTest.java | 6 ++---
.../inlong/agent/plugin/sinks/PulsarSinkTest.java | 6 ++---
.../{TestSenderManager.java => TestSender.java} | 21 ++++++++-------
pom.xml | 6 +++++
11 files changed, 83 insertions(+), 43 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index f7c0f0d7c8..0c73c12852 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -76,6 +76,12 @@ public class AgentConstants {
public static final String AGENT_ENABLE_OOM_EXIT = "agent.enable.oom.exit";
public static final boolean DEFAULT_ENABLE_OOM_EXIT = false;
+ public static final String AGENT_SCAN_RANGE = "agent.scan.range";
+ public static final String DEFAULT_AGENT_SCAN_RANGE = "-2";
+ public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2";
+ public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10";
+ public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600";
+
// pulsar sink config
public static final String PULSAR_CLIENT_IO_TREHAD_NUM =
"agent.sink.pulsar.client.io.thread.num";
public static final int DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM = Math.max(1,
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 3f0a914e90..23c16cabe8 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -56,7 +56,9 @@ public class InstanceManager extends AbstractDaemon {
public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
public static final int INSTANCE_PRINT_INTERVAL_MS = 10000;
public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000;
+ public static final long KEEP_PACE_INTERVAL_MS = 60 * 1000;
private long lastPrintTime = 0;
+ private long lastTraverseTime = 0;
// instance in instance store
private final InstanceStore instanceStore;
private TaskStore taskStore;
@@ -67,7 +69,7 @@ public class InstanceManager extends AbstractDaemon {
private final BlockingQueue<InstanceAction> actionQueue;
private final BlockingQueue<InstanceAction> addActionQueue;
// task thread pool;
- private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ private final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
1L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
@@ -77,10 +79,8 @@ public class InstanceManager extends AbstractDaemon {
private final AgentConfiguration agentConf;
private final String taskId;
private long auditVersion;
- private volatile boolean runAtLeastOneTime = false;
private volatile boolean running = false;
private final double reserveCoefficient = 0.8;
- private long finishedInstanceCount = 0;
private class InstancePrintStat {
@@ -165,12 +165,16 @@ public class InstanceManager extends AbstractDaemon {
Thread.currentThread().setName("instance-manager-core-" + taskId);
running = true;
while (isRunnable()) {
+ long currentTime = AgentUtils.getCurrentTime();
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
printInstanceState();
dealWithActionQueue();
dealWithAddActionQueue();
- keepPaceWithStore();
+ if (currentTime - lastTraverseTime >
KEEP_PACE_INTERVAL_MS) {
+ keepPaceWithStore();
+ lastTraverseTime = currentTime;
+ }
String inlongGroupId = taskFromStore.getInlongGroupId();
String inlongStreamId = taskFromStore.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId,
inlongStreamId,
@@ -179,7 +183,6 @@ public class InstanceManager extends AbstractDaemon {
LOGGER.error("coreThread error: ", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
}
- runAtLeastOneTime = true;
}
running = false;
};
@@ -356,7 +359,6 @@ public class InstanceManager extends AbstractDaemon {
deleteFromMemory(profile.getInstanceId());
LOGGER.info("finished instance state {} taskId {} instanceId {}",
profile.getState(),
profile.getTaskId(), profile.getInstanceId());
- finishedInstanceCount++;
}
private void deleteInstance(String instanceId) {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 3167b850e1..b82e399c81 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -18,9 +18,11 @@
package org.apache.inlong.agent.core.task;
import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.store.InstanceStore;
@@ -50,8 +52,8 @@ public class OffsetManager extends AbstractDaemon {
private static final Logger LOGGER =
LoggerFactory.getLogger(OffsetManager.class);
public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000;
- public static final int CLEAN_INSTANCE_ONCE_LIMIT = 100;
- public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
+ public static final int CLEAN_INSTANCE_ONCE_LIMIT = 1000;
+ public static final long TWO_HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
private static volatile OffsetManager offsetManager = null;
private final OffsetStore offsetStore;
private final InstanceStore instanceStore;
@@ -161,8 +163,7 @@ public class OffsetManager extends AbstractDaemon {
}
}
}
- long expireTime = DateTransUtils.calcOffset(
- DB_INSTANCE_EXPIRE_CYCLE_COUNT +
instanceFromDb.getCycleUnit());
+ long expireTime =
Math.abs(getScanCycleRange(instanceFromDb.getCycleUnit())) +
TWO_HOUR_TIMEOUT_INTERVAL;
if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() >
expireTime) {
cleanCount.getAndIncrement();
LOGGER.info("instance has expired, delete from instance store
dataTime {} taskId {} instanceId {}",
@@ -189,4 +190,26 @@ public class OffsetManager extends AbstractDaemon {
public void stop() throws Exception {
}
+
+ public static long getScanCycleRange(String cycleUnit) {
+ if
(AgentConfiguration.getAgentConf().hasKey(AgentConstants.AGENT_SCAN_RANGE)) {
+ String range =
AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_SCAN_RANGE);
+ return DateTransUtils.calcOffset(range + cycleUnit);
+ }
+ switch (cycleUnit) {
+ case AgentUtils.DAY: {
+ return
DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_DAY +
cycleUnit);
+ }
+ case AgentUtils.HOUR:
+ case AgentUtils.HOUR_LOW_CASE: {
+ return
DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_HOUR +
cycleUnit);
+ }
+ case AgentUtils.MINUTE: {
+ return
DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_MINUTE +
cycleUnit);
+ }
+ default: {
+ return
DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE + cycleUnit);
+ }
+ }
+ }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 069932711c..92bfaa427f 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -29,7 +29,6 @@ import org.apache.inlong.agent.message.file.ProxyMessage;
import org.apache.inlong.agent.message.file.SenderMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
-import org.apache.inlong.agent.plugin.sinks.filecollect.SenderManager;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
@@ -66,7 +65,7 @@ public class ProxySink extends AbstractSink {
new SynchronousQueue<>(),
new AgentThreadFactory("proxy-sink"));
private MessageFilter messageFilter;
- private SenderManager senderManager;
+ private Sender sender;
private byte[] fieldSplitter;
private volatile boolean shutdown = false;
private volatile boolean running = false;
@@ -159,7 +158,7 @@ public class ProxySink extends AbstractSink {
if (senderMessage == null) {
continue;
}
- senderManager.sendBatch(senderMessage);
+ sender.sendBatch(senderMessage);
if (AgentUtils.getCurrentTime() - lastPrintTime >
TimeUnit.SECONDS.toMillis(1)) {
lastPrintTime = AgentUtils.getCurrentTime();
LOGGER.info("send groupId {}, streamId {}, message size {},
taskId {}, "
@@ -178,9 +177,9 @@ public class ProxySink extends AbstractSink {
StandardCharsets.UTF_8);
sourceName = profile.getInstanceId();
offsetManager = OffsetManager.getInstance();
- senderManager = new SenderManager(profile, inlongGroupId, sourceName);
+ sender = new Sender(profile, inlongGroupId, sourceName);
try {
- senderManager.Start();
+ sender.Start();
EXECUTOR_SERVICE.execute(coreThread());
EXECUTOR_SERVICE.execute(flushOffset());
inited = true;
@@ -200,7 +199,7 @@ public class ProxySink extends AbstractSink {
}
Long start = AgentUtils.getCurrentTime();
shutdown = true;
- senderManager.Stop();
+ sender.Stop();
LOGGER.info("destroy proxySink, wait for sender close {} ms instance
{}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
start = AgentUtils.getCurrentTime();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
similarity index 98%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
index 76593b7512..3195ee45c2 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.sinks.filecollect;
+package org.apache.inlong.agent.plugin.sinks;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -68,9 +68,9 @@ import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
/**
* proxy client
*/
-public class SenderManager {
+public class Sender {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SenderManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
private static final SequentialID SEQUENTIAL_ID =
SequentialID.getInstance();
public static final int RESEND_QUEUE_WAIT_MS = 10;
// cache for group and sender list, share the map cross agent lifecycle.
@@ -112,7 +112,7 @@ public class SenderManager {
private static final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
private long auditVersion;
- public SenderManager(InstanceProfile profile, String inlongGroupId, String
sourcePath) {
+ public Sender(InstanceProfile profile, String inlongGroupId, String
sourcePath) {
this.profile = profile;
auditVersion = Long.parseLong(profile.get(TASK_AUDIT_VERSION));
managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
index 4c7b729a4d..49e45ba751 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.plugin.task.logcollection;
import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskAction;
import org.apache.inlong.agent.plugin.task.AbstractTask;
import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
@@ -44,7 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue;
public abstract class LogAbstractTask extends AbstractTask {
private static final int INSTANCE_QUEUE_CAPACITY = 10;
- public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
+ public static final long ONE_HOUR_TIMEOUT_INTERVAL = 3600 * 1000;
private static final Logger LOGGER =
LoggerFactory.getLogger(LogAbstractTask.class);
protected boolean retry;
protected BlockingQueue<InstanceProfile> instanceQueue;
@@ -207,10 +208,13 @@ public abstract class LogAbstractTask extends
AbstractTask {
for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
/* If the data time of the event is within 2 days before (after)
the current time, it is valid */
String dataTime = entry.getKey();
- if (!DateUtils.isValidCreationTime(dataTime,
DAY_TIMEOUT_INTERVAL)) {
+ if (!DateUtils.isValidCreationTime(dataTime,
+
Math.abs(OffsetManager.getScanCycleRange(taskProfile.getCycleUnit()))
+ + ONE_HOUR_TIMEOUT_INTERVAL)) {
/* Remove it from memory map. */
eventMap.remove(dataTime);
- LOGGER.warn("remove too old event from event map. dataTime
{}", dataTime);
+ LOGGER.warn("remove too old event from event map taskId {}
dataTime {}", taskProfile.getTaskId(),
+ dataTime);
}
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
index ecddccce9f..4ededea940 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.utils.regex;
+import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.slf4j.Logger;
@@ -29,7 +30,6 @@ import java.util.List;
public class Scanner {
private static final Logger LOGGER =
LoggerFactory.getLogger(Scanner.class);
- public static final String SCAN_CYCLE_RANCE = "-2";
public static class TimeRange {
@@ -87,9 +87,8 @@ public class Scanner {
boolean isRetry) {
if (!isRetry) {
long currentTime = System.currentTimeMillis();
- // only scan two cycle, like two hours or two days
- long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE +
cycleUnit);
- startTime = currentTime + offset +
DateTransUtils.calcOffset(timeOffset);
+ startTime =
+ currentTime + OffsetManager.getScanCycleRange(cycleUnit) +
DateTransUtils.calcOffset(timeOffset);
endTime = currentTime + DateTransUtils.calcOffset(timeOffset);
}
return new TimeRange(startTime, endTime);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
index 56f46d74a0..deee04f1c9 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.sinks.filecollect.TestSenderManager;
+import org.apache.inlong.agent.plugin.sinks.filecollect.TestSender;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
@@ -40,12 +40,12 @@ public class KafkaSinkTest {
private static MockSink kafkaSink;
private static InstanceProfile profile;
private static AgentBaseTestsHelper helper;
- private static final ClassLoader LOADER =
TestSenderManager.class.getClassLoader();
+ private static final ClassLoader LOADER =
TestSender.class.getClassLoader();
@BeforeClass
public static void setUp() throws Exception {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
- helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
+ helper = new
AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile =
helper.getFileTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, "D",
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
index c8cf365850..eccbfb35fd 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.sinks.filecollect.TestSenderManager;
+import org.apache.inlong.agent.plugin.sinks.filecollect.TestSender;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
@@ -40,12 +40,12 @@ public class PulsarSinkTest {
private static MockSink pulsarSink;
private static InstanceProfile profile;
private static AgentBaseTestsHelper helper;
- private static final ClassLoader LOADER =
TestSenderManager.class.getClassLoader();
+ private static final ClassLoader LOADER =
TestSender.class.getClassLoader();
@BeforeClass
public static void setUp() throws Exception {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
- helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
+ helper = new
AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile =
helper.getFileTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, "D",
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSender.java
similarity index 89%
rename from
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
rename to
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSender.java
index 4e068f5930..5958719c15 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSender.java
@@ -24,6 +24,7 @@ import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.message.file.OffsetAckInfo;
import org.apache.inlong.agent.message.file.SenderMessage;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.sinks.Sender;
import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
@@ -52,12 +53,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@RunWith(PowerMockRunner.class)
-@PrepareForTest(SenderManager.class)
+@PrepareForTest(Sender.class)
@PowerMockIgnore({"javax.management.*"})
-public class TestSenderManager {
+public class TestSender {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TestSenderManager.class);
- private static final ClassLoader LOADER =
TestSenderManager.class.getClassLoader();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestSender.class);
+ private static final ClassLoader LOADER =
TestSender.class.getClassLoader();
private static AgentBaseTestsHelper helper;
private static InstanceProfile profile;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
@@ -69,7 +70,7 @@ public class TestSenderManager {
@BeforeClass
public static void setup() {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
- helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
+ helper = new
AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile =
helper.getFileTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, "D",
@@ -88,17 +89,17 @@ public class TestSenderManager {
List<MsgSendCallback> cbList = new ArrayList<>();
try {
profile.set(TaskConstants.INODE_INFO,
FileDataUtils.getInodeInfo(profile.getInstanceId()));
- SenderManager senderManager = PowerMockito.spy(new
SenderManager(profile, "inlongGroupId", "sourceName"));
- PowerMockito.doNothing().when(senderManager,
"createMessageSender");
+ Sender sender = PowerMockito.spy(new Sender(profile,
"inlongGroupId", "sourceName"));
+ PowerMockito.doNothing().when(sender, "createMessageSender");
PowerMockito.doAnswer(invocation -> {
MsgSendCallback cb = invocation.getArgument(0);
cbList.add(cb);
return null;
- }).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
+ }).when(sender, "asyncSendByMessageSender", Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.anyLong(), Mockito.any(),
Mockito.any(), Mockito.anyBoolean());
- senderManager.Start();
+ sender.Start();
Long offset = 0L;
List<OffsetAckInfo> ackInfoListTotal = new ArrayList<>();
for (int i = 0; i < 10; i++) {
@@ -112,7 +113,7 @@ public class TestSenderManager {
}
SenderMessage senderMessage = new SenderMessage("taskId",
"instanceId", "groupId", "streamId", bodyList,
AgentUtils.getCurrentTime(), null, ackInfoList);
- senderManager.sendBatch(senderMessage);
+ sender.sendBatch(senderMessage);
}
Assert.assertTrue(cbList.size() == 10);
for (int i = 0; i < 5; i++) {
diff --git a/pom.xml b/pom.xml
index 4194b90095..01a88cf4d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -207,6 +207,7 @@
<libfb303.version>0.9.3</libfb303.version>
<apache.thrift.version>0.14.1</apache.thrift.version>
<aircompressor.version>0.27</aircompressor.version>
+ <json.smart.version>2.5.1</json.smart.version>
</properties>
<dependencyManagement>
@@ -1269,6 +1270,11 @@
<artifactId>tencentcloud-sdk-java-cls</artifactId>
<version>${tencentcloud-api.version}</version>
</dependency>
+ <dependency>
+ <groupId>net.minidev</groupId>
+ <artifactId>json-smart</artifactId>
+ <version>${json.smart.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>