This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9abfce4a92 [INLONG-11687][Agent] Optimize task main thread exception
handling to prevent exception exits (#11688)
9abfce4a92 is described below
commit 9abfce4a92cc5c022b05c79cbad08bd12b91df44
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Jan 20 11:30:08 2025 +0800
[INLONG-11687][Agent] Optimize task main thread exception handling to
prevent exception exits (#11688)
---
.../agent/core/instance/InstanceManager.java | 5 +
.../inlong/agent/plugin/sinks/ProxySink.java | 6 +-
.../inlong/agent/plugin/task/AbstractTask.java | 38 +++---
.../inlong/agent/plugin/task/TestSQLTask.java | 147 +++++++++++++++++++++
4 files changed, 176 insertions(+), 20 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 3396c3a591..3f0a914e90 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -366,6 +366,11 @@ public class InstanceManager extends AbstractDaemon {
private void deleteFromStore(String instanceId) {
InstanceProfile profile = instanceStore.getInstance(taskId,
instanceId);
+ if (profile == null) {
+ LOGGER.error("try to delete instance from store but not found:
taskId {} instanceId {}", taskId,
+ instanceId);
+ return;
+ }
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
instanceStore.deleteInstance(taskId, instanceId);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index e00ad65cba..069932711c 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -140,7 +140,11 @@ public class ProxySink extends AbstractSink {
LOGGER.info("start flush cache {}:{} flush interval {}",
inlongGroupId, sourceName, batchFlushInterval);
running = true;
while (!shutdown) {
- sendMessageFromCache();
+ try {
+ sendMessageFromCache();
+ } catch (Throwable e) {
+ LOGGER.error("send message from cache error: ", e);
+ }
AgentUtils.silenceSleepInMs(batchFlushInterval);
}
LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index d9ec53ab0b..ef8107c68e 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -109,32 +109,32 @@ public abstract class AbstractTask extends Task {
public void run() {
Thread.currentThread().setName("task-core-" + getTaskId());
running = true;
- try {
- doRun();
- } catch (Throwable e) {
- LOGGER.error("do run error: ", e);
- ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+ while (!isFinished()) {
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error: ", e);
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+ }
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
running = false;
}
protected void doRun() {
- while (!isFinished()) {
- taskPrint();
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
- if (!initOK) {
- continue;
- }
- List<InstanceProfile> profileList = getNewInstanceList();
- for (InstanceProfile profile : profileList) {
- InstanceAction action = new InstanceAction(ActionType.ADD,
profile);
- while (!isFinished() && !instanceManager.submitAction(action))
{
- LOGGER.error("instance manager action queue is full:
taskId {}", getTaskId());
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
- }
+ taskPrint();
+ if (!initOK) {
+ return;
+ }
+ List<InstanceProfile> profileList = getNewInstanceList();
+ for (InstanceProfile profile : profileList) {
+ InstanceAction action = new InstanceAction(ActionType.ADD,
profile);
+ while (!isFinished() && !instanceManager.submitAction(action)) {
+ LOGGER.error("instance manager action queue is full: taskId
{}", getTaskId());
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
- taskHeartbeat();
}
+ taskHeartbeat();
}
protected abstract List<InstanceProfile> getNewInstanceList();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestSQLTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestSQLTask.java
new file mode 100644
index 0000000000..cc0eea8f5d
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestSQLTask.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.task.logcollection.SQLTask;
+import org.apache.inlong.common.enums.TaskStateEnum;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore({"javax.management.*"})
+public class TestSQLTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestSQLTask.class);
+ private static final ClassLoader LOADER =
TestSQLTask.class.getClassLoader();
+ private static AgentBaseTestsHelper helper;
+ private static TaskManager manager;
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("TestSQLTask"));
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ helper = new
AgentBaseTestsHelper(TestSQLTask.class.getName()).setupAgentHome();
+ manager = new TaskManager();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ helper.teardownAgentHome();
+ }
+
+ @Test
+ public void testScan() {
+ doTest(1, "select * from table where field = YYYYMMDD_[0-9]+;",
CycleUnitType.DAY,
+ Arrays.asList("select * from table where field =
20230928_[0-9]+;",
+ "select * from table where field = 20230929_[0-9]+;",
+ "select * from table where field = 20230930_[0-9]+;"),
+ Arrays.asList("20230928", "20230929", "20230930"),
+ "20230928",
+ "20230930");
+ doTest(2, "select * from table where field = YYYYMMDDHH_[0-9]+;",
CycleUnitType.HOUR,
+ Arrays.asList("select * from table where field =
2023092823_[0-9]+;",
+ "select * from table where field = 2023092900_[0-9]+;",
+ "select * from table where field =
2023092901_[0-9]+;"),
+ Arrays.asList("2023092823", "2023092900", "2023092901"),
"2023092823", "2023092901");
+ doTest(3, "select * from table where field = YYYYMMDDHHmm_[0-9]+;",
CycleUnitType.MINUTE,
+ Arrays.asList("select * from table where field =
202309282359_[0-9]+;",
+ "select * from table where field =
202309290000_[0-9]+;",
+ "select * from table where field =
202309290001_[0-9]+;"),
+ Arrays.asList("202309282359", "202309290000", "202309290001"),
"202309282359", "202309290001");
+ }
+
+ @Test
+ public void testScanLowercase() {
+ doTest(1, "select * from table where field = yyyyMMdd_[0-9]+;",
CycleUnitType.DAY,
+ Arrays.asList("select * from table where field =
20230928_[0-9]+;",
+ "select * from table where field = 20230929_[0-9]+;",
+ "select * from table where field = 20230930_[0-9]+;"),
+ Arrays.asList("20230928", "20230929", "20230930"),
+ "20230928",
+ "20230930");
+ doTest(2, "select * from table where field = yyyyMMddhh_[0-9]+;",
CycleUnitType.HOUR,
+ Arrays.asList("select * from table where field =
2023092823_[0-9]+;",
+ "select * from table where field = 2023092900_[0-9]+;",
+ "select * from table where field =
2023092901_[0-9]+;"),
+ Arrays.asList("2023092823", "2023092900", "2023092901"),
"2023092823", "2023092901");
+ doTest(3, "select * from table where field = yyyyMMddhhmm_[0-9]+;",
CycleUnitType.MINUTE,
+ Arrays.asList("select * from table where field =
202309282359_[0-9]+;",
+ "select * from table where field =
202309290000_[0-9]+;",
+ "select * from table where field =
202309290001_[0-9]+;"),
+ Arrays.asList("202309282359", "202309290000", "202309290001"),
"202309282359", "202309290001");
+ }
+
+ private void doTest(int taskId, String sql, String cycle, List<String>
srcSQLs, List<String> srcDataTimes,
+ String startTime, String endTime) {
+ TaskProfile taskProfile = helper.getSQLTaskProfile(taskId, sql, "csv",
true, startTime, endTime,
+ TaskStateEnum.RUNNING, cycle, "GMT+8:00");
+ SQLTask sqlTask = null;
+ final List<String> fileName = new ArrayList();
+ final List<String> dataTime = new ArrayList();
+ try {
+ sqlTask = PowerMockito.spy(new SQLTask());
+ PowerMockito.doAnswer(invocation -> {
+ fileName.add(invocation.getArgument(0));
+ dataTime.add(invocation.getArgument(1));
+ return null;
+ }).when(sqlTask, "addToEvenMap", Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(),
+ Mockito.anyString());
+ Assert.assertTrue(sqlTask.isProfileValid(taskProfile));
+ manager.getTaskStore().storeTask(taskProfile);
+ sqlTask.init(manager, taskProfile,
manager.getInstanceBasicStore());
+ EXECUTOR_SERVICE.submit(sqlTask);
+ } catch (Exception e) {
+ LOGGER.error("source init error", e);
+ Assert.assertTrue("source init error", false);
+ }
+ await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> fileName.size() == srcDataTimes.size() &&
dataTime.size() == srcDataTimes.size());
+ for (int i = 0; i < fileName.size(); i++) {
+ Assert.assertEquals(0, fileName.get(i).compareTo(srcSQLs.get(i)));
+ Assert.assertEquals(0,
dataTime.get(i).compareTo(srcDataTimes.get(i)));
+ }
+ sqlTask.destroy();
+ }
+}
\ No newline at end of file