This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 01d6fb472f [INLONG-8352][Agent] Optimize the agent UT of
testRestartTriggerJobRestore (#8353)
01d6fb472f is described below
commit 01d6fb472f5e4c1f62aeb1abcbea264a698b61de
Author: doleyzi <[email protected]>
AuthorDate: Wed Jun 28 05:49:24 2023 -0700
[INLONG-8352][Agent] Optimize the agent UT of testRestartTriggerJobRestore
(#8353)
---
.../agent/plugin/trigger/TestTriggerManager.java | 156 +++++++++++----------
1 file changed, 83 insertions(+), 73 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
index ddb70fc2c3..cd448c1603 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
@@ -92,89 +92,99 @@ public class TestTriggerManager {
@Test
public void testRestartTriggerJobRestore() throws Exception {
- agent.cleanupTriggers();
- agent.cleanupJobs();
-
- TriggerProfile triggerProfile1 =
TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE);
- triggerProfile1.set(JobConstants.JOB_ID, "1");
- triggerProfile1.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
- WATCH_FOLDER.getRoot() + "/1.log");
- agent.submitTrigger(triggerProfile1);
-
- WATCH_FOLDER.newFolder("tmp");
- TestUtils.createHugeFiles("1.log",
WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd");
- LOGGER.info("testRestartTriggerJobRestore 1 task size " +
agent.getManager().getTaskManager().getTaskSize());
- await().atMost(10, TimeUnit.SECONDS).until(() ->
agent.getManager().getTaskManager().getTaskSize() == 1);
- LOGGER.info("testRestartTriggerJobRestore 2 task size " +
agent.getManager().getTaskManager().getTaskSize());
- agent.restart();
- LOGGER.info("testRestartTriggerJobRestore 3 task size " +
agent.getManager().getTaskManager().getTaskSize());
- await().atMost(30, TimeUnit.SECONDS).until(() ->
agent.getManager().getTaskManager().getTaskSize() == 1);
- LOGGER.info("testRestartTriggerJobRestore 4 task size " +
agent.getManager().getTaskManager().getTaskSize());
- // cleanup
- TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/1.log");
+ synchronized (this) {
+ agent.cleanupTriggers();
+ agent.cleanupJobs();
+
+ TriggerProfile triggerProfile1 =
TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE);
+ triggerProfile1.set(JobConstants.JOB_ID, "1");
+ triggerProfile1.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
+ WATCH_FOLDER.getRoot() + "/1.log");
+ agent.submitTrigger(triggerProfile1);
+
+ WATCH_FOLDER.newFolder("tmp");
+ TestUtils.createHugeFiles("1.log",
WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd");
+ LOGGER.info(
+ "testRestartTriggerJobRestore 1 task size " +
agent.getManager().getTaskManager().getTaskSize());
+ await().atMost(10, TimeUnit.SECONDS).until(() ->
agent.getManager().getTaskManager().getTaskSize() == 1);
+ LOGGER.info(
+ "testRestartTriggerJobRestore 2 task size " +
agent.getManager().getTaskManager().getTaskSize());
+ agent.restart();
+ LOGGER.info(
+ "testRestartTriggerJobRestore 3 task size " +
agent.getManager().getTaskManager().getTaskSize());
+ await().atMost(30, TimeUnit.SECONDS).until(() ->
agent.getManager().getTaskManager().getTaskSize() == 1);
+ LOGGER.info(
+ "testRestartTriggerJobRestore 4 task size " +
agent.getManager().getTaskManager().getTaskSize());
+ // cleanup
+ TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/1.log");
+ }
}
@Test
public void testMultiTriggerWatchSameDir() throws Exception {
- agent.cleanupTriggers();
- agent.cleanupJobs();
-
- TriggerProfile triggerProfile1 =
TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE);
- triggerProfile1.set(JobConstants.JOB_ID, "1");
- triggerProfile1.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
- WATCH_FOLDER.getRoot() + "/*.log");
-
- TriggerProfile triggerProfile2 =
TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE);
- triggerProfile2.set(JobConstants.JOB_ID, "2");
- triggerProfile2.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
- WATCH_FOLDER.getRoot() + "/*.txt");
-
- agent.submitTrigger(triggerProfile1);
- agent.submitTrigger(triggerProfile2);
-
- TestUtils.createHugeFiles("1.log",
WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd");
- TestUtils.createHugeFiles("1.txt",
WATCH_FOLDER.getRoot().getAbsolutePath(), "asdasdasd");
- await().atMost(10, TimeUnit.SECONDS).until(() ->
agent.getManager().getTaskManager().getTaskSize() == 2);
-
- // cleanup
- TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/1.log");
- TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/1.txt");
+ synchronized (this) {
+ agent.cleanupTriggers();
+ agent.cleanupJobs();
+
+ TriggerProfile triggerProfile1 =
TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE);
+ triggerProfile1.set(JobConstants.JOB_ID, "1");
+ triggerProfile1.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
+ WATCH_FOLDER.getRoot() + "/*.log");
+
+ TriggerProfile triggerProfile2 =
TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE);
+ triggerProfile2.set(JobConstants.JOB_ID, "2");
+ triggerProfile2.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
+ WATCH_FOLDER.getRoot() + "/*.txt");
+
+ agent.submitTrigger(triggerProfile1);
+ agent.submitTrigger(triggerProfile2);
+
+ TestUtils.createHugeFiles("1.log",
WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd");
+ TestUtils.createHugeFiles("1.txt",
WATCH_FOLDER.getRoot().getAbsolutePath(), "asdasdasd");
+ await().atMost(10, TimeUnit.SECONDS).until(() ->
agent.getManager().getTaskManager().getTaskSize() == 2);
+
+ // cleanup
+ TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/1.log");
+ TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/1.txt");
+ }
}
@Test
public void testSubmitAndShutdown() throws Exception {
- agent.cleanupTriggers();
- agent.cleanupJobs();
-
- TriggerProfile triggerProfile1 =
TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE);
- triggerProfile1.set(JobConstants.JOB_ID, "1");
- triggerProfile1.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
- WATCH_FOLDER.getRoot() + "/*.log");
-
- // submit trigger
- agent.submitTrigger(triggerProfile1);
- TestUtils.createHugeFiles("1.log",
WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd");
- DirectoryTrigger trigger = (DirectoryTrigger) agent.getManager()
-
.getTriggerManager().getTrigger(triggerProfile1.getTriggerId());
- await().atMost(10, TimeUnit.SECONDS).until(() -> {
- if (trigger.getWatchers().size() == 0) {
- return false;
- }
-
- for (Map.Entry<WatchKey, Set<DirectoryTrigger>> entry :
trigger.getWatchers().entrySet()) {
- if (entry.getValue().size() != 1) {
+ synchronized (this) {
+ agent.cleanupTriggers();
+ agent.cleanupJobs();
+
+ TriggerProfile triggerProfile1 =
TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE);
+ triggerProfile1.set(JobConstants.JOB_ID, "1");
+ triggerProfile1.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
+ WATCH_FOLDER.getRoot() + "/*.log");
+
+ // submit trigger
+ agent.submitTrigger(triggerProfile1);
+ TestUtils.createHugeFiles("1.log",
WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd");
+ DirectoryTrigger trigger = (DirectoryTrigger) agent.getManager()
+
.getTriggerManager().getTrigger(triggerProfile1.getTriggerId());
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ if (trigger.getWatchers().size() == 0) {
return false;
}
- if (entry.getValue().size() == 1 &&
!entry.getValue().stream().findAny().get().equals(trigger)) {
- return false;
+
+ for (Map.Entry<WatchKey, Set<DirectoryTrigger>> entry :
trigger.getWatchers().entrySet()) {
+ if (entry.getValue().size() != 1) {
+ return false;
+ }
+ if (entry.getValue().size() == 1 &&
!entry.getValue().stream().findAny().get().equals(trigger)) {
+ return false;
+ }
}
- }
- return true;
- });
-
- // shutdown trigger
-
agent.getManager().getTriggerManager().deleteTrigger(triggerProfile1.getTriggerId(),
false);
- await().atMost(10, TimeUnit.SECONDS).until(() ->
trigger.getWatchers().size() == 0);
- TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/1.log");
+ return true;
+ });
+
+ // shutdown trigger
+
agent.getManager().getTriggerManager().deleteTrigger(triggerProfile1.getTriggerId(),
false);
+ await().atMost(10, TimeUnit.SECONDS).until(() ->
trigger.getWatchers().size() == 0);
+ TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/1.log");
+ }
}
}