This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 32e29b8512 [INLONG-8251][Agent] Add global memory limit for file
collect (#8253)
32e29b8512 is described below
commit 32e29b85125e1750e6accf9331e2f4f7939b852c
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Jun 16 10:29:54 2023 +0800
[INLONG-8251][Agent] Add global memory limit for file collect (#8253)
---
.../inlong/agent/constant/CommonConstants.java | 5 +-
.../inlong/agent/constant/FetcherConstants.java | 12 ++
.../inlong/agent/message/PackProxyMessage.java | 11 +-
.../org/apache/inlong/agent/core/AgentManager.java | 14 +-
.../apache/inlong/agent/core/HeartbeatManager.java | 12 ++
.../apache/inlong/agent/core/job/JobManager.java | 1 +
.../inlong/agent/core/task/MemoryManager.java | 115 ++++++++++++
...skPositionManager.java => PositionManager.java} | 91 ++++-----
.../inlong/agent/core/task/TestMemoryManager.java | 84 +++++++++
.../inlong/agent/plugin/sinks/KafkaSink.java | 6 +-
.../inlong/agent/plugin/sinks/ProxySink.java | 30 +--
.../inlong/agent/plugin/sinks/PulsarSink.java | 6 +-
.../inlong/agent/plugin/sinks/SenderManager.java | 81 +++-----
.../agent/plugin/sources/TextFileSource.java | 5 -
.../sources/reader/file/FileReaderOperator.java | 203 +++++++++++++--------
.../sources/reader/file/MonitorTextFile.java | 25 +--
.../org/apache/inlong/agent/plugin/MiniAgent.java | 14 +-
.../apache/inlong/agent/plugin/TestFileAgent.java | 2 +-
.../agent/plugin/trigger/TestTriggerManager.java | 12 +-
19 files changed, 492 insertions(+), 237 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 81fbc7b202..1cd9cbf029 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -64,9 +64,6 @@ public class CommonConstants {
// max size of single batch in bytes, default is 800KB.
public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 800000;
- public static final String PROXY_MESSAGE_SEMAPHORE = "proxy.semaphore";
- public static final int DEFAULT_PROXY_MESSAGE_SEMAPHORE = 20000;
-
public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER =
"proxy.group.queue.maxNumber";
public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER =
10000;
@@ -74,7 +71,7 @@ public class CommonConstants {
public static final int DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000;
public static final String PROXY_BATCH_FLUSH_INTERVAL =
"proxy.batch.flush.interval";
- public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 1000;
+ public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 100;
public static final String PROXY_SENDER_MAX_TIMEOUT =
"proxy.sender.maxTimeout";
// max timeout in seconds.
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
index f45242bc1e..6b48ff1a25 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
@@ -67,4 +67,16 @@ public class FetcherConstants {
public static final String AGENT_MANAGER_AUTH_SECRET_ID =
"agent.manager.auth.secretId";
public static final String AGENT_MANAGER_AUTH_SECRET_KEY =
"agent.manager.auth.secretKey";
+
+ public static final String AGENT_GLOBAL_READER_SOURCE_PERMIT =
"agent.global.reader.source.permit";
+ public static final int DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT = 16 *
1000 * 1000;
+
+ public static final String AGENT_GLOBAL_READER_QUEUE_PERMIT =
"agent.global.reader.queue.permit";
+ public static final int DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT = 16 *
1000 * 1000;
+
+ public static final String AGENT_GLOBAL_CHANNEL_PERMIT =
"agent.global.channel.permit";
+ public static final int DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT = 16 * 1000 *
1000;
+
+ public static final String AGENT_GLOBAL_WRITER_PERMIT =
"agent.global.writer.permit";
+ public static final int DEFAULT_AGENT_GLOBAL_WRITER_PERMIT = 96 * 1000 *
1000;
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
index 15d558748e..8e257b3972 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
@@ -145,19 +145,18 @@ public class PackProxyMessage {
// pre check message size
ProxyMessage peekMessage = messageQueue.peek();
int peekMessageLength = peekMessage.getBody().length;
+ if (resultBatchSize + peekMessageLength > maxPackSize) {
+ break;
+ }
+ ProxyMessage message = messageQueue.remove();
+ int bodySize = message.getBody().length;
if (peekMessageLength > maxPackSize) {
LOGGER.warn("message size is {}, greater than max pack
size {}, drop it!",
peekMessage.getBody().length, maxPackSize);
- int bodySize = peekMessage.getBody().length;
queueSize.addAndGet(-bodySize);
messageQueue.remove();
break;
}
- if (resultBatchSize + peekMessageLength > maxPackSize) {
- break;
- }
- ProxyMessage message = messageQueue.remove();
- int bodySize = message.getBody().length;
resultBatchSize += bodySize;
// decrease queue size.
queueSize.addAndGet(-bodySize);
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index d7b921f355..11f9ce793e 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -25,8 +25,8 @@ import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.conf.ConfigJetty;
import org.apache.inlong.agent.core.job.JobManager;
+import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
import org.apache.inlong.agent.core.trigger.TriggerManager;
import org.apache.inlong.agent.db.CommandDb;
import org.apache.inlong.agent.db.Db;
@@ -57,7 +57,7 @@ public class AgentManager extends AbstractDaemon {
private final JobManager jobManager;
private final TaskManager taskManager;
private final TriggerManager triggerManager;
- private final TaskPositionManager taskPositionManager;
+ private final PositionManager positionManager;
private final HeartbeatManager heartbeatManager;
private final ProfileFetcher fetcher;
private final AgentConfiguration conf;
@@ -82,7 +82,7 @@ public class AgentManager extends AbstractDaemon {
taskManager = new TaskManager(this);
fetcher = initFetcher(this);
heartbeatManager = HeartbeatManager.getInstance(this);
- taskPositionManager = TaskPositionManager.getInstance(this);
+ positionManager = PositionManager.getInstance(this);
// need to be an option.
if (conf.getBoolean(
AgentConstants.AGENT_ENABLE_HTTP,
AgentConstants.DEFAULT_AGENT_ENABLE_HTTP)) {
@@ -174,8 +174,8 @@ public class AgentManager extends AbstractDaemon {
return triggerManager;
}
- public TaskPositionManager getTaskPositionManager() {
- return taskPositionManager;
+ public PositionManager getTaskPositionManager() {
+ return positionManager;
}
public TaskManager getTaskManager() {
@@ -206,7 +206,7 @@ public class AgentManager extends AbstractDaemon {
LOGGER.info("starting heartbeat manager");
heartbeatManager.start();
LOGGER.info("starting task position manager");
- taskPositionManager.start();
+ positionManager.start();
LOGGER.info("starting read job from local");
// read job profiles from local
List<JobProfile> profileList = localProfile.readFromLocal();
@@ -249,7 +249,7 @@ public class AgentManager extends AbstractDaemon {
jobManager.stop();
taskManager.stop();
heartbeatManager.stop();
- taskPositionManager.stop();
+ positionManager.stop();
agentConfMonitor.shutdown();
this.db.close();
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 9214de04e8..b945969d6f 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.core.job.Job;
import org.apache.inlong.agent.core.job.JobManager;
import org.apache.inlong.agent.core.job.JobWrapper;
+import org.apache.inlong.agent.core.task.MemoryManager;
import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
@@ -69,6 +70,7 @@ import static
org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
public class HeartbeatManager extends AbstractDaemon implements
AbstractHeartbeatManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(HeartbeatManager.class);
+ public static final int PRINT_MEMORY_PERMIT_INTERVAL_SECOND = 60;
private static HeartbeatManager heartbeatManager = null;
private final JobManager jobmanager;
private final AgentConfiguration conf;
@@ -122,6 +124,16 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
public void start() throws Exception {
submitWorker(snapshotReportThread());
submitWorker(heartbeatReportThread());
+ submitWorker(printMemoryPermitThread());
+ }
+
+ private Runnable printMemoryPermitThread() {
+ return () -> {
+ while (isRunnable()) {
+ MemoryManager.getInstance().printAll();
+
AgentUtils.silenceSleepInSeconds(PRINT_MEMORY_PERMIT_INTERVAL_SECOND);
+ }
+ };
}
private Runnable snapshotReportThread() {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 576d5682b6..c1e933d8ae 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -108,6 +108,7 @@ public class JobManager extends AbstractDaemon {
this.dimensions = new HashMap<>();
this.dimensions.put(KEY_COMPONENT_NAME,
this.getClass().getSimpleName());
this.jobMetrics = new
AgentMetricItemSet(this.getClass().getSimpleName());
+ MetricRegister.unregister(jobMetrics);
MetricRegister.register(jobMetrics);
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
new file mode 100644
index 0000000000..d67a15fb5a
--- /dev/null
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.task;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT;
+
+/**
+ * used to limit global memory to avoid oom
+ */
+public class MemoryManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MemoryManager.class);
+ private static volatile MemoryManager memoryManager = null;
+ private final AgentConfiguration conf;
+ private ConcurrentHashMap<String, Semaphore> semaphoreMap = new
ConcurrentHashMap<>();
+
+ private MemoryManager() {
+ this.conf = AgentConfiguration.getAgentConf();
+ Semaphore semaphore = null;
+ semaphore = new Semaphore(
+ conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT,
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT));
+ semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore);
+
+ semaphore = new Semaphore(
+ conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT,
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT));
+ semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore);
+
+ semaphore = new Semaphore(
+ conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT,
DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT));
+ semaphoreMap.put(AGENT_GLOBAL_CHANNEL_PERMIT, semaphore);
+
+ semaphore = new Semaphore(
+ conf.getInt(AGENT_GLOBAL_WRITER_PERMIT,
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT));
+ semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore);
+ }
+
+ /**
+ * manager singleton
+ */
+ public static MemoryManager getInstance() {
+ if (memoryManager == null) {
+ synchronized (MemoryManager.class) {
+ if (memoryManager == null) {
+ memoryManager = new MemoryManager();
+ }
+ }
+ }
+ return memoryManager;
+ }
+
+ public boolean tryAcquire(String semaphoreName, int permit) {
+ Semaphore semaphore = semaphoreMap.get(semaphoreName);
+ if (semaphore == null) {
+ LOGGER.error("tryAcquire {} not exist");
+ return false;
+ }
+ return semaphore.tryAcquire(permit);
+ }
+
+ public void release(String semaphoreName, int permit) {
+ Semaphore semaphore = semaphoreMap.get(semaphoreName);
+ if (semaphore == null) {
+ LOGGER.error("release {} not exist");
+ return;
+ }
+ semaphore.release(permit);
+ }
+
+ public void printDetail(String semaphoreName) {
+ Semaphore semaphore = semaphoreMap.get(semaphoreName);
+ if (semaphore == null) {
+ LOGGER.error("printDetail {} not exist");
+ return;
+ }
+ LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(),
semaphore.getQueueLength(),
+ semaphoreName);
+ }
+
+ public void printAll() {
+ printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT);
+ printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT);
+ printDetail(AGENT_GLOBAL_CHANNEL_PERMIT);
+ printDetail(AGENT_GLOBAL_WRITER_PERMIT);
+ }
+}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
similarity index 65%
rename from
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
rename to
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
index 85f94710d3..48f6525d01 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
@@ -39,16 +39,16 @@ import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FE
* where key is task read file name and value is task sink position
* note that this class is generated
*/
-public class TaskPositionManager extends AbstractDaemon {
+public class PositionManager extends AbstractDaemon {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TaskPositionManager.class);
- private static volatile TaskPositionManager taskPositionManager = null;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PositionManager.class);
+ private static volatile PositionManager positionManager = null;
private final AgentManager agentManager;
private final JobProfileDb jobConfDb;
private final AgentConfiguration conf;
private ConcurrentHashMap<String, ConcurrentHashMap<String, Long>>
jobTaskPositionMap;
- private TaskPositionManager(AgentManager agentManager) {
+ private PositionManager(AgentManager agentManager) {
this.conf = AgentConfiguration.getAgentConf();
this.agentManager = agentManager;
this.jobConfDb = agentManager.getJobManager().getJobConfDb();
@@ -58,25 +58,25 @@ public class TaskPositionManager extends AbstractDaemon {
/**
* task position manager singleton, can only generated by agent manager
*/
- public static TaskPositionManager getInstance(AgentManager agentManager) {
- if (taskPositionManager == null) {
- synchronized (TaskPositionManager.class) {
- if (taskPositionManager == null) {
- taskPositionManager = new
TaskPositionManager(agentManager);
+ public static PositionManager getInstance(AgentManager agentManager) {
+ if (positionManager == null) {
+ synchronized (PositionManager.class) {
+ if (positionManager == null) {
+ positionManager = new PositionManager(agentManager);
}
}
}
- return taskPositionManager;
+ return positionManager;
}
/**
* get taskPositionManager singleton
*/
- public static TaskPositionManager getInstance() {
- if (taskPositionManager == null) {
+ public static PositionManager getInstance() {
+ if (positionManager == null) {
throw new RuntimeException("task position manager has not been
initialized by agentManager");
}
- return taskPositionManager;
+ return positionManager;
}
@Override
@@ -87,30 +87,34 @@ public class TaskPositionManager extends AbstractDaemon {
private Runnable taskPositionFlushThread() {
return () -> {
while (isRunnable()) {
- try {
- // check pending jobs and try to submit again.
- for (String jobId : jobTaskPositionMap.keySet()) {
- JobProfile jobProfile = jobConfDb.getJobById(jobId);
- if (jobProfile == null) {
- LOGGER.warn("jobProfile {} cannot be found in db, "
- + "might be deleted by standalone mode,
now delete job position in memory", jobId);
- deleteJobPosition(jobId);
- continue;
- }
- flushJobProfile(jobId, jobProfile);
- }
- } catch (Throwable ex) {
- LOGGER.error("error caught", ex);
- ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
- } finally {
- int flushTime = conf.getInt(AGENT_HEARTBEAT_INTERVAL,
- DEFAULT_AGENT_FETCHER_INTERVAL);
- AgentUtils.silenceSleepInSeconds(flushTime);
- }
+ doFlush();
}
};
}
+ private void doFlush() {
+ try {
+ // check pending jobs and try to submit again.
+ for (String jobId : jobTaskPositionMap.keySet()) {
+ JobProfile jobProfile = jobConfDb.getJobById(jobId);
+ if (jobProfile == null) {
+ LOGGER.warn("jobProfile {} cannot be found in db, "
+ + "might be deleted by standalone mode, now delete
job position in memory", jobId);
+ deleteJobPosition(jobId);
+ continue;
+ }
+ flushJobProfile(jobId, jobProfile);
+ }
+ } catch (Throwable ex) {
+ LOGGER.error("error caught", ex);
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
+ } finally {
+ int flushTime = conf.getInt(AGENT_HEARTBEAT_INTERVAL,
+ DEFAULT_AGENT_FETCHER_INTERVAL);
+ AgentUtils.silenceSleepInSeconds(flushTime);
+ }
+ }
+
private void flushJobProfile(String jobId, JobProfile jobProfile) {
jobTaskPositionMap.get(jobId).forEach(
(fileName, position) -> jobProfile.setLong(fileName +
POSITION_SUFFIX, position));
@@ -134,17 +138,22 @@ public class TaskPositionManager extends AbstractDaemon {
/**
* update job sink position
*
- * @param newPosition
+ * @param size add this size to beforePosition
*/
- public void updateSinkPosition(String jobInstanceId, String sourcePath,
long newPosition) {
- LOGGER.info("updateSinkPosition jobInstanceId {} sourcePath {}
newPosition {}", jobInstanceId, sourcePath,
- newPosition);
+ public void updateSinkPosition(String jobInstanceId, String sourcePath,
long size, boolean reset) {
ConcurrentHashMap<String, Long> positionTemp = new
ConcurrentHashMap<>();
- ConcurrentHashMap<String, Long> lastPosition =
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
- if (lastPosition == null) {
- positionTemp.put(sourcePath, newPosition);
+ ConcurrentHashMap<String, Long> position =
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
+ if (position == null) {
+ JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
+ positionTemp.put(sourcePath, jobProfile.getLong(sourcePath +
POSITION_SUFFIX, 0));
+ position = positionTemp;
+ }
+
+ if (!reset) {
+ Long beforePosition = position.getOrDefault(sourcePath, 0L);
+ position.put(sourcePath, beforePosition + size);
} else {
- lastPosition.put(sourcePath, newPosition);
+ position.put(sourcePath, size);
}
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java
new file mode 100644
index 0000000000..9118230115
--- /dev/null
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.task;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT;
+
+public class TestMemoryManager {
+
+ private static AgentConfiguration conf;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ conf = AgentConfiguration.getAgentConf();
+ }
+
+ @Test
+ public void testAll() {
+ int sourcePermit = conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT,
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT);
+ int readerQueuePermit = conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT,
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
+ int channelPermit = conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT,
DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT);
+ int writerPermit = conf.getInt(AGENT_GLOBAL_WRITER_PERMIT,
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT);
+
+ boolean suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT,
sourcePermit);
+ Assert.assertTrue(suc);
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT,
readerQueuePermit);
+ Assert.assertTrue(suc);
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT,
channelPermit);
+ Assert.assertTrue(suc);
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT,
writerPermit);
+ Assert.assertTrue(suc);
+
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, 1);
+ Assert.assertFalse(suc);
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, 1);
+ Assert.assertFalse(suc);
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, 1);
+ Assert.assertFalse(suc);
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 1);
+ Assert.assertFalse(suc);
+
+ MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
sourcePermit);
+ MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
readerQueuePermit);
+ MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT,
channelPermit);
+ MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT,
writerPermit);
+
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT,
sourcePermit);
+ Assert.assertTrue(suc);
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT,
readerQueuePermit);
+ Assert.assertTrue(suc);
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT,
channelPermit);
+ Assert.assertTrue(suc);
+ suc =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT,
writerPermit);
+ Assert.assertTrue(suc);
+ }
+
+}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
index 22f96d8eaf..1e9029e334 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
@@ -20,7 +20,7 @@ package org.apache.inlong.agent.plugin.sinks;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
@@ -76,7 +76,7 @@ public class KafkaSink extends AbstractSink {
private static final ExecutorService EXECUTOR_SERVICE = new
ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new
AgentThreadFactory("KafkaSink"));
private final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
- private TaskPositionManager taskPositionManager;
+ private PositionManager taskPositionManager;
private volatile boolean shutdown = false;
private List<MQClusterInfo> mqClusterInfos;
@@ -92,7 +92,7 @@ public class KafkaSink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
- taskPositionManager = TaskPositionManager.getInstance();
+ taskPositionManager = PositionManager.getInstance();
int sendQueueSize = agentConf.getInt(KAFKA_SINK_SEND_QUEUE_SIZE,
DEFAULT_SEND_QUEUE_SIZE);
kafkaSendQueue = new LinkedBlockingQueue<>(sendQueueSize);
producerNum = agentConf.getInt(KAFKA_SINK_PRODUCER_NUM,
DEFAULT_PRODUCER_NUM);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 248796a9ec..7bf723e965 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin.sinks;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.task.MemoryManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
@@ -43,6 +44,8 @@ import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
/**
* sink message data to inlong-dataproxy
@@ -68,12 +71,6 @@ public class ProxySink extends AbstractSink {
if (message == null) {
return;
}
- // if the message size is greater than max pack size,should drop it.
- if (message.getBody().length > maxPackSize) {
- LOGGER.warn("message size is {}, greater than max pack size {},
drop it!",
- message.getBody().length, maxPackSize);
- return;
- }
boolean suc = false;
while (!suc) {
suc = putInCache(message);
@@ -98,6 +95,13 @@ public class ProxySink extends AbstractSink {
}
AtomicBoolean suc = new AtomicBoolean(false);
ProxyMessage proxyMessage = new ProxyMessage(message);
+ boolean writerPermitSuc = MemoryManager.getInstance()
+ .tryAcquire(AGENT_GLOBAL_WRITER_PERMIT,
message.getBody().length);
+ if (!writerPermitSuc) {
+ LOGGER.warn("writer tryAcquire failed");
+
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT);
+ return false;
+ }
// add proxy message to cache.
cache.compute(proxyMessage.getBatchKey(),
(s, packProxyMessage) -> {
@@ -111,11 +115,11 @@ public class ProxySink extends AbstractSink {
return packProxyMessage;
});
if (suc.get()) {
- // semaphore should be acquired only when the message was put
in cache successfully
- senderManager.acquireSemaphore(1);
+
MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT,
message.getBody().length);
// increment the count of successful sinks
sinkMetric.sinkSuccessCount.incrementAndGet();
} else {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT,
message.getBody().length);
// increment the count of failed sinks
sinkMetric.sinkFailCount.incrementAndGet();
}
@@ -147,7 +151,7 @@ public class ProxySink extends AbstractSink {
*/
private Runnable flushCache() {
return () -> {
- LOGGER.info("start flush cache thread for {} ProxySink",
inlongGroupId);
+ LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName);
while (!shutdown) {
try {
cache.forEach((batchKey, packProxyMessage) -> {
@@ -159,7 +163,6 @@ public class ProxySink extends AbstractSink {
batchProxyMessage.getDataList().size(),
jobInstanceId, sourceName,
batchProxyMessage.getDataTime());
}
-
});
} catch (Exception ex) {
LOGGER.error("error caught", ex);
@@ -169,6 +172,7 @@ public class ProxySink extends AbstractSink {
AgentUtils.silenceSleepInMs(batchFlushInterval);
}
}
+ LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName);
};
}
@@ -182,7 +186,6 @@ public class ProxySink extends AbstractSink {
executorService.execute(flushCache());
senderManager = new SenderManager(jobConf, inlongGroupId, sourceName);
try {
- senderManager.addMessageSender();
senderManager.Start();
} catch (Throwable ex) {
LOGGER.error("error while init sender for group id {}",
inlongGroupId);
@@ -193,14 +196,15 @@ public class ProxySink extends AbstractSink {
@Override
public void destroy() {
- LOGGER.info("destroy sink which sink from source name {}", sourceName);
+ LOGGER.info("destroy sink source name {}", sourceName);
while (!sinkFinish()) {
- LOGGER.info("job {} wait until cache all flushed to proxy",
jobInstanceId);
+ LOGGER.info("sourceName {} wait until cache all flushed to proxy",
sourceName);
AgentUtils.silenceSleepInMs(batchFlushInterval);
}
shutdown = true;
executorService.shutdown();
senderManager.Stop();
+ LOGGER.info("destroy sink source name {} end", sourceName);
}
/**
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index d2801e4b6d..348cd1c4b7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -20,7 +20,7 @@ package org.apache.inlong.agent.plugin.sinks;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
@@ -91,7 +91,7 @@ public class PulsarSink extends AbstractSink {
private static final ExecutorService EXECUTOR_SERVICE = new
ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new
AgentThreadFactory("PulsarSink"));
private final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
- private TaskPositionManager taskPositionManager;
+ private PositionManager positionManager;
private volatile boolean shutdown = false;
private List<MQClusterInfo> mqClusterInfos;
private String topic;
@@ -119,7 +119,7 @@ public class PulsarSink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
- taskPositionManager = TaskPositionManager.getInstance();
+ positionManager = PositionManager.getInstance();
// agentConf
sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE,
DEFAULT_SEND_QUEUE_SIZE);
sendQueueSemaphore = new Semaphore(sendQueueSize);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 615f4700bd..3447eb0058 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -21,7 +21,8 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
@@ -40,14 +41,10 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -56,6 +53,7 @@ import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
@@ -73,18 +71,16 @@ public class SenderManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(SenderManager.class);
private static final SequentialID SEQUENTIAL_ID =
SequentialID.getInstance();
- private static final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
+ private final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
// cache for group and sender list, share the map cross agent lifecycle.
- private static final ConcurrentHashMap<String, List<DefaultMessageSender>>
SENDER_MAP =
- new ConcurrentHashMap<>();
+ private DefaultMessageSender sender;
private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
private final ExecutorService resendExecutorService = new
ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new
AgentThreadFactory("SendManager-Resend"));
// sharing worker threads between sender client
// in case of thread abusing.
- private static final ThreadFactory SHARED_FACTORY = new
DefaultThreadFactory("agent-client-io",
- Thread.currentThread().isDaemon());
+ private ThreadFactory SHARED_FACTORY;
private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
private final String managerHost;
private final int managerPort;
@@ -104,14 +100,12 @@ public class SenderManager {
private final String sourcePath;
private final boolean proxySend;
private volatile boolean shutdown = false;
-
// metric
private AgentMetricItemSet metricItemSet;
private Map<String, String> dimensions;
- private TaskPositionManager taskPositionManager;
+ private PositionManager positionManager;
private int ioThreadNum;
private boolean enableBusyWait;
- private Semaphore semaphore;
private String authSecretId;
private String authSecretKey;
protected int batchFlushInterval;
@@ -144,9 +138,7 @@ public class SenderManager {
retrySleepTime = jobConf.getLong(
CommonConstants.PROXY_RETRY_SLEEP,
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE,
CommonConstants.DEFAULT_IS_FILE);
- taskPositionManager = TaskPositionManager.getInstance();
- semaphore = new
Semaphore(jobConf.getInt(CommonConstants.PROXY_MESSAGE_SEMAPHORE,
- CommonConstants.DEFAULT_PROXY_MESSAGE_SEMAPHORE));
+ positionManager = PositionManager.getInstance();
ioThreadNum =
jobConf.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
enableBusyWait =
jobConf.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
@@ -167,13 +159,15 @@ public class SenderManager {
resendQueue = new LinkedBlockingQueue<>();
}
- public void Start() {
+ public void Start() throws Exception {
+ sender = createMessageSender(inlongGroupId);
resendExecutorService.execute(flushResendQueue());
}
public void Stop() {
shutdown = true;
resendExecutorService.shutdown();
+ sender.close();
}
private AgentMetricItem getMetricItem(Map<String, String> otherDimensions)
{
@@ -190,26 +184,6 @@ public class SenderManager {
return getMetricItem(dims);
}
- /**
- * Select by group.
- *
- * @param group inlong group id
- * @return default message sender
- */
- private DefaultMessageSender selectSender(String group) {
- List<DefaultMessageSender> senderList = SENDER_MAP.get(group);
- return senderList.get((SENDER_INDEX.getAndIncrement() & 0x7FFFFFFF) %
senderList.size());
- }
-
- public void acquireSemaphore(int messageNum) {
- try {
- semaphore.acquire(messageNum);
- } catch (Exception e) {
- LOGGER.error("acquire messageNum {} fail, current semaphore {}",
- messageNum, semaphore.availablePermits());
- }
- }
-
/**
* sender
*
@@ -228,28 +202,15 @@ public class SenderManager {
proxyClientConfig.setEnableBusyWait(enableBusyWait);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
+ SHARED_FACTORY = new DefaultThreadFactory("agent-client-" + sourcePath,
+ Thread.currentThread().isDaemon());
+
DefaultMessageSender sender = new
DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
sender.setMsgtype(msgType);
sender.setCompress(isCompress);
return sender;
}
- /**
- * Add new sender for group id if max size is not satisfied.
- */
- public void addMessageSender() throws Exception {
- List<DefaultMessageSender> tmpList = new ArrayList<>();
- List<DefaultMessageSender> senderList =
SENDER_MAP.putIfAbsent(inlongGroupId, tmpList);
- if (senderList == null) {
- senderList = tmpList;
- }
- if (senderList.size() > maxSenderPerGroup) {
- return;
- }
- DefaultMessageSender sender = createMessageSender(inlongGroupId);
- senderList.add(sender);
- }
-
public void sendBatch(BatchProxyMessage batchMessage) {
sendBatchWithRetryCount(batchMessage, 0);
}
@@ -261,7 +222,7 @@ public class SenderManager {
boolean suc = false;
while (!suc) {
try {
- selectSender(batchMessage.getGroupId()).asyncSendMessage(new
AgentSenderCallback(batchMessage, retry),
+ sender.asyncSendMessage(new AgentSenderCallback(batchMessage,
retry),
batchMessage.getDataList(), batchMessage.getGroupId(),
batchMessage.getStreamId(),
batchMessage.getDataTime(),
SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS,
batchMessage.getExtraMap(), proxySend);
@@ -271,13 +232,14 @@ public class SenderManager {
} catch (Exception exception) {
suc = false;
if (retry > maxSenderRetry) {
- LOGGER.warn("max retry reached, retry count is {}, sleep
and send again", retry);
+ if (retry % 10 == 0) {
+ LOGGER.error("max retry reached, sample log Exception
caught", exception);
+ }
} else {
LOGGER.error("Exception caught", exception);
}
retry++;
AgentUtils.silenceSleepInMs(retrySleepTime);
-
}
}
}
@@ -289,7 +251,7 @@ public class SenderManager {
*/
private Runnable flushResendQueue() {
return () -> {
- LOGGER.info("start flush cache thread for {} ProxySink",
inlongGroupId);
+ LOGGER.info("start flush resend queue {}:{}", inlongGroupId,
sourcePath);
while (!shutdown) {
try {
AgentSenderCallback callback = resendQueue.poll(1,
TimeUnit.SECONDS);
@@ -304,6 +266,7 @@ public class SenderManager {
AgentUtils.silenceSleepInMs(batchFlushInterval);
}
}
+ LOGGER.info("stop flush resend queue {}:{}", inlongGroupId,
sourcePath);
};
}
@@ -342,10 +305,12 @@ public class SenderManager {
String jobId = batchMessage.getJobId();
long dataTime = batchMessage.getDataTime();
if (result != null && result.equals(SendResult.OK)) {
- semaphore.release(msgCnt);
+
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int)
batchMessage.getTotalSize());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId, dataTime, msgCnt,
batchMessage.getTotalSize());
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
+ PositionManager.getInstance()
+ .updateSinkPosition(batchMessage.getJobId(),
sourcePath, msgCnt, false);
} else {
LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime
{} fail with times {}, "
+ "error {}", groupId, streamId, jobId, dataTime,
retry, result);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index f288ae7b3a..d680d49f0b 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -70,7 +70,6 @@ public class TextFileSource extends AbstractSource {
FileReaderOperator fileReader = new FileReaderOperator(file,
startPosition);
long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT,
DEFAULT_JOB_READ_WAIT_TIMEOUT);
fileReader.setWaitMillisecond(waitTimeout);
- addValidator(filterPattern, fileReader);
result.add(fileReader);
}
// increment the count of successful sources
@@ -83,8 +82,4 @@ public class TextFileSource extends AbstractSource {
seekPosition = jobConf.getInt(file.getAbsolutePath() +
POSITION_SUFFIX, 0);
return seekPosition;
}
-
- private void addValidator(String filterPattern, FileReaderOperator
fileReader) {
- fileReader.addPatternValidator(filterPattern);
- }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 82c3a40592..bcd4b59272 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -22,15 +22,14 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.DataCollectType;
import org.apache.inlong.agent.constant.JobConstants;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Validator;
import org.apache.inlong.agent.plugin.sources.reader.AbstractReader;
import org.apache.inlong.agent.plugin.utils.FileDataUtils;
-import org.apache.inlong.agent.plugin.validator.PatternValidator;
import org.apache.inlong.agent.utils.AgentUtils;
import com.google.gson.Gson;
@@ -54,7 +53,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -64,6 +62,9 @@ import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PAC
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
@@ -85,11 +86,12 @@ public class FileReaderOperator extends AbstractReader {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileReaderOperator.class);
public static final int NEVER_STOP_SIGN = -1;
- public static final int BATCH_READ_SIZE = 10000;
- public static final int CACHE_QUEUE_SIZE = 10 * BATCH_READ_SIZE;
- public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
- private static final SimpleDateFormat RECORD_TIME_FORMAT = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- private static final Gson GSON = new Gson();
+ public static final int BATCH_READ_LINE_COUNT = 10000;
+ public static final int BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
+ public static final int CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
+ public static int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private final SimpleDateFormat RECORD_TIME_FORMAT = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ private final Gson GSON = new Gson();
public File file;
public long position = 0;
@@ -104,11 +106,11 @@ public class FileReaderOperator extends AbstractReader {
public String fileKey = null;
private long timeout;
private long waitTimeout;
+ public volatile long monitorUpdateTime;
private long lastTime = 0;
- private List<Validator> validators = new ArrayList<>();
- private static final byte[] inBuf = new byte[DEFAULT_BUFFER_SIZE];
- private static int maxPackSize;
-
+ private final byte[] inBuf = new byte[DEFAULT_BUFFER_SIZE];
+ private int maxPackSize;
+ private final long monitorActiveInterval = 60 * 1000;
private final BlockingQueue<String> queue = new
LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
private final StringBuffer sb = new StringBuffer();
@@ -124,31 +126,67 @@ public class FileReaderOperator extends AbstractReader {
this.metadata = new HashMap<>();
}
- public FileReaderOperator(File file) {
- this(file, 0);
- }
-
@Override
public Message read() {
String data = null;
try {
data = queue.poll(DEFAULT_JOB_READ_WAIT_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- LOGGER.warn("poll {} data get interruptted.", file.getPath(), e);
+ LOGGER.warn("poll {} data get interrupted.", file.getPath(), e);
+ }
+ if (data == null) {
+ keepMonitorActive();
+ return null;
+ } else {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
data.length());
+ }
+ Message finalMsg = createMessage(data);
+ if (finalMsg == null) {
+ return null;
}
- return Optional.ofNullable(data)
- .map(this::metadataMessage)
- .filter(this::filterMessage)
- .map(message -> {
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId,
- System.currentTimeMillis(), 1, message.length());
- readerMetric.pluginReadSuccessCount.incrementAndGet();
- readerMetric.pluginReadCount.incrementAndGet();
- String proxyPartitionKey =
jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
- Map<String, String> header = new HashMap<>();
- header.put(PROXY_KEY_DATA, proxyPartitionKey);
- return new
DefaultMessage(message.getBytes(StandardCharsets.UTF_8), header);
- }).orElse(null);
+ boolean channelPermit = MemoryManager.getInstance()
+ .tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT,
finalMsg.getBody().length);
+ if (channelPermit == false) {
+ LOGGER.warn("channel tryAcquire failed");
+
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_CHANNEL_PERMIT);
+ AgentUtils.silenceSleepInSeconds(1);
+ return null;
+ }
+ return finalMsg;
+ }
+
+ private Message createMessage(String data) {
+ String msgWithMetaData = fillMetaData(data);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
inlongStreamId,
+ System.currentTimeMillis(), 1, msgWithMetaData.length());
+ readerMetric.pluginReadSuccessCount.incrementAndGet();
+ readerMetric.pluginReadCount.incrementAndGet();
+ String proxyPartitionKey = jobConf.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
+ Map<String, String> header = new HashMap<>();
+ header.put(PROXY_KEY_DATA, proxyPartitionKey);
+ Message finalMsg = new
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
+ // if the message size is greater than max pack size,should drop it.
+ if (finalMsg.getBody().length > maxPackSize) {
+ LOGGER.warn("message size is {}, greater than max pack size {},
drop it!",
+ finalMsg.getBody().length, maxPackSize);
+ return null;
+ }
+ return finalMsg;
+ }
+
+ public void keepMonitorActive() {
+ if (!isMonitorActive()) {
+ LOGGER.error("monitor not active, create a new one");
+ MonitorTextFile.getInstance().monitor(this);
+ }
+ }
+
+ private boolean isMonitorActive() {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - monitorUpdateTime > monitorActiveInterval) {
+ return false;
+ }
+ return true;
}
@Override
@@ -207,13 +245,6 @@ public class FileReaderOperator extends AbstractReader {
return true;
}
- public void addPatternValidator(String pattern) {
- if (pattern.isEmpty()) {
- return;
- }
- validators.add(new PatternValidator(pattern));
- }
-
@Override
public void init(JobProfile jobConf) {
try {
@@ -228,22 +259,23 @@ public class FileReaderOperator extends AbstractReader {
LOGGER.warn("md5 is differ from origin, origin: {}, new {}",
this.md5, md5);
}
LOGGER.info("file name for task is {}, md5 is {}", file, md5);
-
+ monitorUpdateTime = System.currentTimeMillis();
MonitorTextFile.getInstance().monitor(this);
if (!jobConf.get(JOB_FILE_MONITOR_STATUS,
JOB_FILE_MONITOR_DEFAULT_STATUS)
.equals(JOB_FILE_MONITOR_DEFAULT_STATUS)) {
readEndpoint = Files.lines(file.toPath()).count();
}
try {
- position =
TaskPositionManager.getInstance().getPosition(getReadSource(), instanceId);
+ position =
PositionManager.getInstance().getPosition(getReadSource(), instanceId);
} catch (Exception ex) {
position = 0;
LOGGER.error("get position from position manager error, only
occur in ut: {}", ex.getMessage());
}
this.bytePosition = getStartBytePosition(position);
- LOGGER.info("FileReaderOperator init file {} instanceId {} history
position {}", getReadSource(),
+ LOGGER.info("FileReaderOperator init file {} instanceId {} history
position {} readEndpoint {}",
+ getReadSource(),
instanceId,
- position);
+ position, readEndpoint);
if (isIncrement(jobConf)) {
LOGGER.info("FileReaderOperator DataCollectType INCREMENT:
start bytePosition {},{}",
file.length(), file.getAbsolutePath());
@@ -251,8 +283,8 @@ public class FileReaderOperator extends AbstractReader {
try (LineNumberReader lineNumberReader = new
LineNumberReader(new FileReader(file.getPath()))) {
lineNumberReader.skip(Long.MAX_VALUE);
position = lineNumberReader.getLineNumber();
- TaskPositionManager.getInstance().updateSinkPosition(
- getJobInstanceId(), getReadSource(), position);
+ PositionManager.getInstance().updateSinkPosition(
+ getJobInstanceId(), getReadSource(), position,
true);
LOGGER.info("for increment update {}, position to {}",
file.getAbsolutePath(), position);
} catch (IOException ex) {
@@ -260,7 +292,7 @@ public class FileReaderOperator extends AbstractReader {
}
}
try {
- resiterMeta(jobConf);
+ registerMeta(jobConf);
} catch (Exception ex) {
LOGGER.error("init metadata error", ex);
}
@@ -278,8 +310,8 @@ public class FileReaderOperator extends AbstractReader {
input = new RandomAccessFile(file, "r");
while (readCount < lineNum) {
List<String> lines = new ArrayList<>();
- pos = readLines(input, pos, lines,
- Math.min((int) (lineNum - readCount),
FileReaderOperator.BATCH_READ_SIZE));
+ pos = readLines(input, pos, lines, Math.min((int) (lineNum -
readCount), BATCH_READ_LINE_COUNT),
+ BATCH_READ_LINE_TOTAL_LEN, true);
readCount += lines.size();
if (lines.size() == 0) {
LOGGER.error("getStartBytePosition LineNum {} larger than
the real file");
@@ -293,7 +325,7 @@ public class FileReaderOperator extends AbstractReader {
input.close();
}
}
- LOGGER.info("getStartBytePosition LineNum {} position {}", lineNum,
pos);
+ LOGGER.info("getStartBytePosition {} LineNum {} position {}",
getReadSource(), lineNum, pos);
return pos;
}
@@ -319,23 +351,26 @@ public class FileReaderOperator extends AbstractReader {
@Override
public void destroy() {
+ LOGGER.info("destroy read source name {}", getReadSource());
finished = true;
+ while (!queue.isEmpty()) {
+ String data = null;
+ try {
+ data = queue.poll(DEFAULT_JOB_READ_WAIT_TIMEOUT,
TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("poll {} data get interrupted.", file.getPath(),
e);
+ }
+ if (data != null) {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
data.length());
+ }
+ }
queue.clear();
+ LOGGER.info("destroy read source name {} end", getReadSource());
LOGGER.info("destroy reader with read {} num {}",
metricName, readerMetric == null ? 0 :
readerMetric.pluginReadCount.get());
}
- public boolean filterMessage(String message) {
- if (StringUtils.isBlank(message)) {
- return false;
- }
- if (validators.isEmpty()) {
- return true;
- }
- return validators.stream().allMatch(v -> v.validate(message));
- }
-
- public String metadataMessage(String message) {
+ public String fillMetaData(String message) {
long timestamp = System.currentTimeMillis();
boolean isJson = FileDataUtils.isJSON(message);
Map<String, String> mergeData = new HashMap<>(metadata);
@@ -348,7 +383,7 @@ public class FileReaderOperator extends AbstractReader {
return !queue.isEmpty();
}
- public void resiterMeta(JobProfile jobConf) {
+ public void registerMeta(JobProfile jobConf) {
if (!jobConf.hasKey(JOB_FILE_META_ENV_LIST)) {
return;
}
@@ -365,10 +400,16 @@ public class FileReaderOperator extends AbstractReader {
}
public void fetchData() throws IOException {
- // todo: TaskPositionManager stored position should be changed to byte
position.Now it store msg sent, so here
- // every line (include empty line) should be sent, otherwise the read
position will be offset when
- // restarting and recovering. In the same time, Regex end line
spiltted line also has this problem, because
- // recovering is based on line position.
+ boolean readFromPosPermit = false;
+ while (readFromPosPermit == false) {
+ readFromPosPermit = MemoryManager.getInstance()
+ .tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
+ if (readFromPosPermit == false) {
+ LOGGER.warn("fetchData tryAcquire failed");
+
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT);
+ AgentUtils.silenceSleepInSeconds(1);
+ }
+ }
List<String> lines = readFromPos(bytePosition);
if (!lines.isEmpty()) {
LOGGER.info("path is {}, line is {}, byte position is {}, reads
data lines {}",
@@ -376,8 +417,17 @@ public class FileReaderOperator extends AbstractReader {
}
List<String> resultLines = lines;
resultLines.forEach(line -> {
+ boolean offerPermit = false;
+ while (offerPermit != true) {
+ offerPermit =
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT,
line.length());
+ if (offerPermit != true) {
+ LOGGER.warn("offerPermit tryAcquire failed");
+
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT);
+ AgentUtils.silenceSleepInSeconds(1);
+ }
+ }
try {
- boolean offerSuc = queue.offer(line, 1, TimeUnit.SECONDS);
+ boolean offerSuc = false;
while (offerSuc != true) {
offerSuc = queue.offer(line, 1, TimeUnit.SECONDS);
}
@@ -386,7 +436,9 @@ public class FileReaderOperator extends AbstractReader {
LOGGER.error("fetchData offer failed {}", e.getMessage());
}
});
+ MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
if (position >= readEndpoint) {
+ LOGGER.info("read to the end, set finished position {}
readEndpoint {}", position, readEndpoint);
finished = true;
}
}
@@ -396,9 +448,8 @@ public class FileReaderOperator extends AbstractReader {
RandomAccessFile input = null;
try {
input = new RandomAccessFile(file, "r");
- bytePosition = readLines(input, pos, lines,
FileReaderOperator.BATCH_READ_SIZE);
+ bytePosition = readLines(input, pos, lines, BATCH_READ_LINE_COUNT,
BATCH_READ_LINE_TOTAL_LEN, false);
position += lines.size();
-
TaskPositionManager.getInstance().updateSinkPosition(getJobInstanceId(),
getReadSource(), position);
} catch (Exception e) {
LOGGER.error("readFromPos error {}", e.getMessage());
} finally {
@@ -416,7 +467,8 @@ public class FileReaderOperator extends AbstractReader {
* @return The new position after the lines have been read
* @throws java.io.IOException if an I/O error occurs.
*/
- private static long readLines(RandomAccessFile reader, long pos,
List<String> lines, int maxLineCount)
+ private long readLines(RandomAccessFile reader, long pos, List<String>
lines, int maxLineCount, int maxLineTotalLen,
+ boolean isCounting)
throws IOException {
if (maxLineCount == 0) {
return pos;
@@ -425,6 +477,7 @@ public class FileReaderOperator extends AbstractReader {
reader.seek(pos);
long rePos = pos; // position to re-read
int num;
+ int lineTotalLen = 0;
LOGGER.debug("readLines from {}", pos);
boolean overLen = false;
while ((num = reader.read(inBuf)) != -1) {
@@ -433,7 +486,13 @@ public class FileReaderOperator extends AbstractReader {
byte ch = inBuf[i];
switch (ch) {
case '\n':
- lines.add(new String(baos.toByteArray()));
+ if (isCounting) {
+ lines.add(new String(""));
+ } else {
+ String temp = new String(baos.toByteArray(),
StandardCharsets.UTF_8);
+ lines.add(temp);
+ lineTotalLen += temp.length();
+ }
rePos = pos + i + 1;
if (overLen) {
LOGGER.warn("readLines over len finally string len
{}",
@@ -451,11 +510,11 @@ public class FileReaderOperator extends AbstractReader {
overLen = true;
}
}
- if (lines.size() >= maxLineCount) {
+ if (lines.size() >= maxLineCount || lineTotalLen >=
maxLineTotalLen) {
break;
}
}
- if (lines.size() >= maxLineCount) {
+ if (lines.size() >= maxLineCount || lineTotalLen >=
maxLineTotalLen) {
break;
}
if (i == num) {
@@ -477,7 +536,7 @@ public class FileReaderOperator extends AbstractReader {
isFirst = false;
}
}
- LOGGER.info("is first store job {}, {}", file.getAbsolutePath(),
isFirst);
+ LOGGER.info("isFirst {}, {}", file.getAbsolutePath(), isFirst);
return isFirst;
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index 531b5d0821..33207f5bd2 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -18,7 +18,7 @@
package org.apache.inlong.agent.plugin.sources.reader.file;
import org.apache.inlong.agent.common.AgentThreadFactory;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.PositionManager;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -43,7 +43,9 @@ import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INT
public final class MonitorTextFile {
private static final Logger LOGGER =
LoggerFactory.getLogger(MonitorTextFile.class);
- // monitor thread pool
+ /**
+ * monitor thread pool
+ */
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
@@ -114,13 +116,17 @@ public final class MonitorTextFile {
long currentTime = System.currentTimeMillis();
if (expireTime !=
Long.parseLong(JOB_FILE_MONITOR_DEFAULT_EXPIRE)
&& currentTime - this.startTime > expireTime) {
+ LOGGER.info("monitor expire in {}", expireTime);
break;
}
if (fileReaderOperator.inited) {
listen();
}
+ fileReaderOperator.monitorUpdateTime = currentTime;
TimeUnit.MILLISECONDS.sleep(interval);
}
+ LOGGER.info("Job {} stop monitor {}",
+ fileReaderOperator.instanceId,
fileReaderOperator.file.getAbsolutePath());
} catch (Exception e) {
LOGGER.error(String.format("monitor %s error",
fileReaderOperator.file.getName()), e);
}
@@ -164,7 +170,7 @@ public final class MonitorTextFile {
}
/**
- * reset the position and bytePosition
+ * Reset the position and bytePosition
*/
private void resetPosition() {
LOGGER.info("reset position {}", fileReaderOperator.file.toPath());
@@ -173,26 +179,23 @@ public final class MonitorTextFile {
String jobInstanceId = fileReaderOperator.getJobInstanceId();
if (jobInstanceId != null) {
- TaskPositionManager.getInstance().updateSinkPosition(
- jobInstanceId, fileReaderOperator.getReadSource(), 0);
+ PositionManager.getInstance().updateSinkPosition(
+ jobInstanceId, fileReaderOperator.getReadSource(), 0,
true);
}
}
/**
* Determine whether the inode has changed
*
- * @param currentFileKey
- * @return
+ * @param currentFileKey current file key
+ * @return true if the inode changed, otherwise false
*/
private boolean isInodeChanged(String currentFileKey) {
if (fileReaderOperator.fileKey == null || currentFileKey == null) {
return false;
}
- if (fileReaderOperator.fileKey.equals(currentFileKey)) {
- return false;
- }
- return true;
+ return !fileReaderOperator.fileKey.equals(currentFileKey);
}
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
index ce02d1a21c..113ea65465 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
@@ -23,7 +23,7 @@ import org.apache.inlong.agent.conf.ProfileFetcher;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.core.HeartbeatManager;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.PositionManager;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.api.support.membermodification.MemberModifier;
@@ -55,16 +55,16 @@ public class MiniAgent {
}
private void init() throws Exception {
- TaskPositionManager taskPositionManager =
PowerMockito.mock(TaskPositionManager.class);
+ PositionManager positionManager =
PowerMockito.mock(PositionManager.class);
HeartbeatManager heartbeatManager =
PowerMockito.mock(HeartbeatManager.class);
ProfileFetcher profileFetcher =
PowerMockito.mock(ProfileFetcher.class);
- PowerMockito.doNothing().when(taskPositionManager, "start");
- PowerMockito.doNothing().when(taskPositionManager, "stop");
+ PowerMockito.doNothing().when(positionManager, "start");
+ PowerMockito.doNothing().when(positionManager, "stop");
PowerMockito.doNothing().when(heartbeatManager, "start");
PowerMockito.doNothing().when(heartbeatManager, "stop");
PowerMockito.doNothing().when(profileFetcher, "start");
PowerMockito.doNothing().when(profileFetcher, "stop");
- MemberModifier.field(AgentManager.class,
"taskPositionManager").set(manager, taskPositionManager);
+ MemberModifier.field(AgentManager.class,
"positionManager").set(manager, positionManager);
MemberModifier.field(AgentManager.class,
"heartbeatManager").set(manager, heartbeatManager);
MemberModifier.field(AgentManager.class, "fetcher").set(manager,
profileFetcher);
}
@@ -105,8 +105,8 @@ public class MiniAgent {
public void cleanupTriggers() {
triggerProfileCache
- .forEach(triggerProfile ->
manager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(),
- false));
+ .forEach(triggerProfile -> manager.getTriggerManager()
+ .deleteTrigger(triggerProfile.getTriggerId(), false));
triggerProfileCache.clear();
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 547c132f00..13df91dc97 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -75,7 +75,7 @@ public class TestFileAgent {
agent.start();
testRootDir = helper.getTestRootDir();
} catch (Exception e) {
- LOGGER.error("setup failure");
+ LOGGER.error("setup failure", e);
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
index 8ea308ec69..1e3a19f077 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
@@ -92,6 +92,7 @@ public class TestTriggerManager {
@Test
public void testRestartTriggerJobRestore() throws Exception {
+
TriggerProfile triggerProfile1 =
TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE);
triggerProfile1.set(JobConstants.JOB_ID, "1");
triggerProfile1.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
@@ -100,16 +101,15 @@ public class TestTriggerManager {
WATCH_FOLDER.newFolder("tmp");
TestUtils.createHugeFiles("1.log",
WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd");
- System.out.println(" task size " +
agent.getManager().getTaskManager().getTaskSize());
+ LOGGER.info("testRestartTriggerJobRestore 1 task size " +
agent.getManager().getTaskManager().getTaskSize());
await().atMost(10, TimeUnit.SECONDS).until(() ->
agent.getManager().getTaskManager().getTaskSize() == 1);
-
+ LOGGER.info("testRestartTriggerJobRestore 2 task size " +
agent.getManager().getTaskManager().getTaskSize());
agent.restart();
- await().atMost(10, TimeUnit.SECONDS).until(() ->
agent.getManager().getTaskManager().getTaskSize() == 1);
-
+ LOGGER.info("testRestartTriggerJobRestore 3 task size " +
agent.getManager().getTaskManager().getTaskSize());
+ await().atMost(30, TimeUnit.SECONDS).until(() ->
agent.getManager().getTaskManager().getTaskSize() == 1);
+ LOGGER.info("testRestartTriggerJobRestore 4 task size " +
agent.getManager().getTaskManager().getTaskSize());
// cleanup
TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/1.log");
- TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/2.log");
- TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/tmp/3.log");
}
@Test