This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4d7838415b [INLONG-11574][Agent] Add COS source unit test (#11575)
4d7838415b is described below
commit 4d7838415b981d37e0ef1c1bceac8fdd4d9b8245
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Dec 6 09:48:06 2024 +0800
[INLONG-11574][Agent] Add COS source unit test (#11575)
---
.../inlong/agent/core/AgentBaseTestsHelper.java | 33 ---
.../inlong/agent/plugin/task/cos/COSTask.java | 2 -
.../inlong/agent/plugin/task/cos/FileScanner.java | 20 +-
.../inlong/agent/plugin/AgentBaseTestsHelper.java | 53 ++++-
.../agent/plugin/instance/TestInstanceManager.java | 5 +-
.../inlong/agent/plugin/sinks/KafkaSinkTest.java | 5 +-
.../inlong/agent/plugin/sinks/PulsarSinkTest.java | 5 +-
.../sinks/filecollect/TestSenderManager.java | 5 +-
.../agent/plugin/sources/TestLogFileSource.java | 2 +-
.../agent/plugin/sources/TestRedisSource.java | 2 +-
.../agent/plugin/sources/TestSQLServerSource.java | 2 +-
.../inlong/agent/plugin/task/TestCOSTask.java | 233 +++++++++++++++++++++
.../inlong/agent/plugin/task/TestLogFileTask.java | 8 +-
.../inlong/agent/plugin/task/TestTaskManager.java | 9 +-
14 files changed, 317 insertions(+), 67 deletions(-)
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
index a8dfbdf91a..af0a54f129 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
@@ -18,12 +18,8 @@
package org.apache.inlong.agent.core;
import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.FetcherConstants;
-import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
-import org.apache.inlong.common.enums.TaskStateEnum;
-import org.apache.inlong.common.pojo.agent.DataConfig;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -74,33 +70,4 @@ public class AgentBaseTestsHelper {
}
}
}
-
- public TaskProfile getTaskProfile(int taskId, String pattern, boolean
retry, String startTime, String endTime,
- TaskStateEnum state, String timeZone) {
- DataConfig dataConfig = getDataConfig(taskId, pattern, retry,
startTime, endTime, state, timeZone);
- TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
- return profile;
- }
-
- private DataConfig getDataConfig(int taskId, String pattern, boolean
retry, String startTime, String endTime,
- TaskStateEnum state, String timeZone) {
- DataConfig dataConfig = new DataConfig();
- dataConfig.setInlongGroupId("testGroupId");
- dataConfig.setInlongStreamId("testStreamId");
- dataConfig.setDataReportType(1);
- dataConfig.setTaskType(3);
- dataConfig.setTaskId(taskId);
- dataConfig.setTimeZone(timeZone);
- dataConfig.setState(state.ordinal());
- FileTaskConfig fileTaskConfig = new FileTaskConfig();
- fileTaskConfig.setPattern(pattern);
- fileTaskConfig.setTimeOffset("0h");
- fileTaskConfig.setMaxFileCount(100);
- fileTaskConfig.setCycleUnit("h");
- fileTaskConfig.setRetry(retry);
- fileTaskConfig.setDataTimeFrom(startTime);
- fileTaskConfig.setDataTimeTo(endTime);
- dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
- return dataConfig;
- }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
index eceb38e8be..de2beb7cc0 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
@@ -196,7 +196,6 @@ public class COSTask extends AbstractTask {
}
private void scanExistingFile() {
- LOGGER.info("test123 qqqq");
List<BasicFileInfo> fileInfos =
FileScanner.scanTaskBetweenTimes(cosClient, bucketName, originPattern,
taskProfile.getCycleUnit(), timeOffset, startTime, endTime,
retry);
LOGGER.info("taskId {} scan {} get file count {}", getTaskId(),
originPattern, fileInfos.size());
@@ -314,7 +313,6 @@ public class COSTask extends AbstractTask {
taskProfile.getTaskId(), dataTime, fileName);
return;
}
- LOGGER.info("test123 {}", cosClient);
ObjectMetadata meta = cosClient.getObjectMetadata(bucketName,
fileName);
Long fileUpdateTime = meta.getLastModified().getTime();
if (!shouldAddAgain(fileName, fileUpdateTime)) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
index 4eac0eeff2..1019d34135 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
@@ -97,16 +97,20 @@ public class FileScanner {
}
List<String> commonPrefixes = objectListing.getCommonPrefixes();
int depth;
- Pattern patternByDepth = null;
+ Pattern patternByDepth;
if (!commonPrefixes.isEmpty()) {
depth = countCharacterOccurrences(commonPrefixes.get(0),
PATH_SEP);
- String temp = findNthOccurrenceSubstring(pattern.pattern(),
PATH_SEP, depth);
- patternByDepth = Pattern.compile(temp,
Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
- }
- for (String commonPrefix : commonPrefixes) {
- Matcher matcher = patternByDepth.matcher(commonPrefix);
- if (matcher.matches()) {
- infos.addAll(scanTaskInOneCycle(cosClient, bucketName,
pattern, commonPrefix, dataTime, cycleUnit));
+ String nthOccurrenceSubstring =
findNthOccurrenceSubstring(pattern.pattern(), PATH_SEP, depth);
+ if (nthOccurrenceSubstring != null) {
+ patternByDepth = Pattern.compile(nthOccurrenceSubstring,
+ Pattern.CASE_INSENSITIVE | Pattern.DOTALL |
Pattern.MULTILINE);
+ for (String commonPrefix : commonPrefixes) {
+ Matcher matcher = patternByDepth.matcher(commonPrefix);
+ if (matcher.matches()) {
+ infos.addAll(scanTaskInOneCycle(cosClient,
bucketName, pattern, commonPrefix, dataTime,
+ cycleUnit));
+ }
+ }
}
}
List<COSObjectSummary> cosObjectSummaries =
objectListing.getObjectSummaries();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 3dc4f8ab15..214a29b24e 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -21,8 +21,10 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.FetcherConstants;
+import org.apache.inlong.agent.pojo.COSTask.COSTaskConfig;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
import com.google.gson.Gson;
@@ -82,24 +84,24 @@ public class AgentBaseTestsHelper {
}
}
- public TaskProfile getTaskProfile(int taskId, String pattern, String
dataContentStyle, boolean retry,
+ public TaskProfile getFileTaskProfile(int taskId, String pattern, String
dataContentStyle, boolean retry,
String startTime, String endTime,
TaskStateEnum state, String cycleUnit, String timeZone,
List<String> filterStreams) {
- DataConfig dataConfig = getDataConfig(taskId, pattern,
dataContentStyle, retry, startTime, endTime,
+ DataConfig dataConfig = getFileDataConfig(taskId, pattern,
dataContentStyle, retry, startTime, endTime,
state, cycleUnit, timeZone,
filterStreams);
TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
return profile;
}
- private DataConfig getDataConfig(int taskId, String pattern, String
dataContentStyle, boolean retry,
+ private DataConfig getFileDataConfig(int taskId, String pattern, String
dataContentStyle, boolean retry,
String startTime, String endTime, TaskStateEnum state, String
cycleUnit, String timeZone,
List<String> filterStreams) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
dataConfig.setInlongStreamId("testStreamId");
dataConfig.setDataReportType(1);
- dataConfig.setTaskType(3);
+ dataConfig.setTaskType(TaskTypeEnum.FILE.getType());
dataConfig.setTaskId(taskId);
dataConfig.setTimeZone(timeZone);
dataConfig.setState(state.ordinal());
@@ -119,4 +121,47 @@ public class AgentBaseTestsHelper {
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
return dataConfig;
}
+
+ public TaskProfile getCOSTaskProfile(int taskId, String pattern, String
contentStyle, boolean retry,
+ String startTime, String endTime,
+ TaskStateEnum state, String cycleUnit, String timeZone,
List<String> filterStreams) {
+ DataConfig dataConfig = getCOSDataConfig(taskId, pattern,
contentStyle, retry, startTime, endTime,
+ state, cycleUnit, timeZone,
+ filterStreams);
+ TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
+ return profile;
+ }
+
+ private DataConfig getCOSDataConfig(int taskId, String pattern, String
contentStyle, boolean retry,
+ String startTime, String endTime, TaskStateEnum state, String
cycleUnit, String timeZone,
+ List<String> filterStreams) {
+ DataConfig dataConfig = new DataConfig();
+ dataConfig.setInlongGroupId("testGroupId");
+ dataConfig.setInlongStreamId("testStreamId");
+ dataConfig.setDataReportType(1);
+ dataConfig.setTaskType(TaskTypeEnum.COS.getType());
+ dataConfig.setTaskId(taskId);
+ dataConfig.setTimeZone(timeZone);
+ dataConfig.setState(state.ordinal());
+ COSTaskConfig cosTaskConfig = new COSTaskConfig();
+ cosTaskConfig.setBucketName("testBucket");
+ cosTaskConfig.setCredentialsId("testSecretId");
+ cosTaskConfig.setCredentialsKey("testSecretKey");
+ cosTaskConfig.setRegion("testRegion");
+ cosTaskConfig.setPattern(pattern);
+ cosTaskConfig.setTimeOffset("0d");
+ // GMT-8:00 same with Asia/Shanghai
+ cosTaskConfig.setMaxFileCount(100);
+ cosTaskConfig.setCycleUnit(cycleUnit);
+ cosTaskConfig.setRetry(retry);
+ cosTaskConfig.setDataTimeFrom(startTime);
+ cosTaskConfig.setDataTimeTo(endTime);
+ // mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf
+ cosTaskConfig.setContentStyle(contentStyle);
+ cosTaskConfig.setDataSeparator("|");
+ cosTaskConfig.setFilterStreams(filterStreams);
+ dataConfig.setExtParams(GSON.toJson(cosTaskConfig));
+ return dataConfig;
+ }
+
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
index 61c94c4dd1..9901f29023 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
@@ -59,8 +59,9 @@ public class TestInstanceManager {
helper = new
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt";
Store basicInstanceStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
- taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
- "GMT+6:00", null);
+ taskProfile =
+ helper.getFileTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
+ "GMT+6:00", null);
Store taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
TaskStore taskStore = new TaskStore(taskBasicStore);
taskStore.storeTask(taskProfile);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
index 066776d32f..b8703c7056 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -47,8 +47,9 @@ public class KafkaSinkTest {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, "", "", TaskStateEnum.RUNNING, "D",
- "GMT+8:00", null);
+ TaskProfile taskProfile =
+ helper.getFileTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, "D",
+ "GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
kafkaSink = new MockSink();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
index d7733259ab..43e3115dcb 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -47,8 +47,9 @@ public class PulsarSinkTest {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, "", "", TaskStateEnum.RUNNING, "D",
- "GMT+8:00", null);
+ TaskProfile taskProfile =
+ helper.getFileTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, "D",
+ "GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
pulsarSink = new MockSink();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 5a1168edef..1c9e623b9b 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -70,8 +70,9 @@ public class TestSenderManager {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, "", "", TaskStateEnum.RUNNING, "D",
- "GMT+8:00", null);
+ TaskProfile taskProfile =
+ helper.getFileTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, "D",
+ "GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 408b9f1b70..d7a93a0df8 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -84,7 +84,7 @@ public class TestLogFileSource {
fileName = LOADER.getResource("test/20230928_1.txt").getPath();
pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
retry = false;
- TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
dataContentStyle, retry, "", "",
+ TaskProfile taskProfile = helper.getFileTaskProfile(taskId,
pattern, dataContentStyle, retry, "", "",
TaskStateEnum.RUNNING, "D",
"GMT+8:00", Arrays.asList("ok"));
InstanceProfile instanceProfile =
taskProfile.createInstanceProfile("",
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
index 4f2e90870b..14518078f2 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
@@ -121,7 +121,7 @@ public class TestRedisSource {
final String command = "zscore";
final String subOperation = "set,del";
- TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false,
"", "", TaskStateEnum.RUNNING, "D",
+ TaskProfile taskProfile = helper.getFileTaskProfile(1, "", "csv",
false, "", "", TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
profile = taskProfile.createInstanceProfile("",
"", taskProfile.getCycleUnit(), "20240725",
AgentUtils.getCurrentTime());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 377e5ae913..410acd8e77 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -136,7 +136,7 @@ public class TestSQLServerSource {
final String tableName = "test_source";
final String serverName = "server-01";
- TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false,
"", "", TaskStateEnum.RUNNING, "D",
+ TaskProfile taskProfile = helper.getFileTaskProfile(1, "", "csv",
false, "", "", TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
instanceProfile = taskProfile.createInstanceProfile("",
"", taskProfile.getCycleUnit(), "20240725",
AgentUtils.getCurrentTime());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
new file mode 100644
index 0000000000..f66e4845b1
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.task.cos.COSTask;
+import org.apache.inlong.agent.plugin.utils.cos.COSUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.model.COSObjectSummary;
+import com.qcloud.cos.model.ListObjectsRequest;
+import com.qcloud.cos.model.ObjectListing;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({COSUtils.class, COSTask.class, COSClient.class,
ObjectListing.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestCOSTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestCOSTask.class);
+ private static final ClassLoader LOADER =
TestCOSTask.class.getClassLoader();
+ private static AgentBaseTestsHelper helper;
+ private static TaskManager manager;
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("TestCOSTask"));
+ private static COSClient cosClient;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ helper = new
AgentBaseTestsHelper(TestCOSTask.class.getName()).setupAgentHome();
+ manager = new TaskManager();
+ cosClient = Mockito.mock(COSClient.class);
+ PowerMockito.mockStatic(COSUtils.class);
+ Mockito.when(COSUtils.createCli(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(cosClient);
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ helper.teardownAgentHome();
+ }
+
+ private void mockDay(COSClient cosClient) {
+ ObjectListing objectListing1_1 = Mockito.mock(ObjectListing.class);
+ when(objectListing1_1.getCommonPrefixes()).thenReturn(
+ Arrays.asList("some/20230928_0/", "some/20230928_1/",
"some/20230928_aaa/"));
+
when(objectListing1_1.getObjectSummaries()).thenReturn(getSummaries(Arrays.asList("some/20230928_test_0.txt")));
+
+ ObjectListing objectListing1_2 = Mockito.mock(ObjectListing.class);
+ when(objectListing1_2.getCommonPrefixes()).thenReturn(
+ Arrays.asList("some/20230929_aaa/", "some/20230929_1/",
"some/20230929_2/"));
+ when(objectListing1_2.getObjectSummaries()).thenReturn(
+ getSummaries(Arrays.asList("some/20230929_0_test_0.txt")));
+
+ ObjectListing objectListing2_1 = Mockito.mock(ObjectListing.class);
+ when(objectListing2_1.getCommonPrefixes()).thenReturn(
+ Arrays.asList("some/20230928_0/where/",
"some/20230928_0/test_1/"));
+ when(objectListing2_1.getObjectSummaries()).thenReturn(getSummaries(
+ Arrays.asList("some/20230928_0/test_0.txt",
"some/20230928_0/test_1.txt",
+ "some/20230928_0/test_o.txt")));
+
+ ObjectListing objectListing2_2 = Mockito.mock(ObjectListing.class);
+ when(objectListing2_2.getCommonPrefixes()).thenReturn(
+ Arrays.asList("some/20230929_1/where/",
"some/20230929_1/test_1/"));
+ when(objectListing2_2.getObjectSummaries()).thenReturn(getSummaries(
+ Arrays.asList("some/20230929_1/test_0.txt",
"some/20230929_1/test_1.txt",
+ "some/20230929_1/test_o.txt")));
+
+
when(cosClient.listObjects(Mockito.any(ListObjectsRequest.class))).thenAnswer(mock
-> {
+ ListObjectsRequest req = mock.getArgument(0);
+ if (req.getPrefix().equals("some/20230928_")) {
+ return objectListing1_1;
+ } else if (req.getPrefix().equals("some/20230929_")) {
+ return objectListing1_2;
+ } else if (req.getPrefix().equals("some/20230928_0/")) {
+ return objectListing2_1;
+ } else if (req.getPrefix().equals("some/20230929_1/")) {
+ return objectListing2_2;
+ } else {
+ return new ObjectListing();
+ }
+ });
+ }
+
+ private void mockHour(COSClient cosClient) {
+ ObjectListing objectListing1_1 = Mockito.mock(ObjectListing.class);
+ when(objectListing1_1.getCommonPrefixes()).thenReturn(
+ Arrays.asList("some/2023092800_0/", "some/2023092800_1/",
"some/2023092800_aaa/"));
+ when(objectListing1_1.getObjectSummaries()).thenReturn(
+ getSummaries(Arrays.asList("some/2023092800_test_0.txt")));
+
+ ObjectListing objectListing1_2 = Mockito.mock(ObjectListing.class);
+ when(objectListing1_2.getCommonPrefixes()).thenReturn(
+ Arrays.asList("some/2023092901_aaa/", "some/2023092901_1/",
"some/2023092901_2/"));
+ when(objectListing1_2.getObjectSummaries()).thenReturn(
+ getSummaries(Arrays.asList("some/2023092901_0_test_0.txt")));
+
+ ObjectListing objectListing2_1 = Mockito.mock(ObjectListing.class);
+ when(objectListing2_1.getCommonPrefixes()).thenReturn(
+ Arrays.asList("some/2023092800_0/where/",
"some/2023092800_0/test_1/"));
+ when(objectListing2_1.getObjectSummaries()).thenReturn(getSummaries(
+ Arrays.asList("some/2023092800_0/test_0.txt",
"some/2023092800_0/test_1.txt",
+ "some/2023092800_0/test_o.txt")));
+
+ ObjectListing objectListing2_2 = Mockito.mock(ObjectListing.class);
+ when(objectListing2_2.getCommonPrefixes()).thenReturn(
+ Arrays.asList("some/2023092901_1/where/",
"some/2023092901_1/test_1/"));
+ when(objectListing2_2.getObjectSummaries()).thenReturn(getSummaries(
+ Arrays.asList("some/2023092901_1/test_0.txt",
"some/2023092901_1/test_1.txt",
+ "some/2023092901_1/test_o.txt")));
+
+
when(cosClient.listObjects(Mockito.any(ListObjectsRequest.class))).thenAnswer(mock
-> {
+ ListObjectsRequest req = mock.getArgument(0);
+ if (req.getPrefix().equals("some/2023092800_")) {
+ return objectListing1_1;
+ } else if (req.getPrefix().equals("some/2023092901_")) {
+ return objectListing1_2;
+ } else if (req.getPrefix().equals("some/2023092800_0/")) {
+ return objectListing2_1;
+ } else if (req.getPrefix().equals("some/2023092901_1/")) {
+ return objectListing2_2;
+ } else {
+ return new ObjectListing();
+ }
+ });
+ }
+
+ private List<COSObjectSummary> getSummaries(List<String> keys) {
+ List<COSObjectSummary> summaries = new ArrayList<>();
+ for (int i = 0; i < keys.size(); i++) {
+ COSObjectSummary summary = new COSObjectSummary();
+ summary.setKey(keys.get(i));
+ summary.setSize(100);
+ summary.setStorageClass("what");
+ summaries.add(summary);
+ }
+ return summaries;
+ }
+
+ @Test
+ public void testScan() {
+ mockDay(cosClient);
+ doTest(1, "some/YYYYMMDD_[0-9]+/test_[0-9]+.txt", CycleUnitType.DAY,
+ Arrays.asList("some/20230928_0/test_0.txt",
"some/20230928_0/test_1.txt", "some/20230929_1/test_0.txt",
+ "some/20230929_1/test_1.txt"),
+ Arrays.asList("20230928", "20230928", "20230929", "20230929"),
+ "20230928",
+ "20230930");
+ mockHour(cosClient);
+ doTest(2, "some/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt", CycleUnitType.HOUR,
+ Arrays.asList("some/2023092800_0/test_0.txt",
"some/2023092800_0/test_1.txt",
+ "some/2023092901_1/test_0.txt",
+ "some/2023092901_1/test_1.txt"),
+ Arrays.asList("2023092800", "2023092800", "2023092901",
"2023092901"), "2023092800",
+ "2023093023");
+ }
+
+ private void doTest(int taskId, String pattern, String cycle, List<String>
srcKeys, List<String> srcDataTimes,
+ String startTime, String endTime) {
+ TaskProfile taskProfile = helper.getCOSTaskProfile(taskId, pattern,
"csv", true, startTime, endTime,
+ TaskStateEnum.RUNNING,
+ cycle, "GMT+8:00", null);
+ COSTask task = null;
+ final List<String> fileName = new ArrayList();
+ final List<String> dataTime = new ArrayList();
+ try {
+ task = PowerMockito.spy(new COSTask());
+ PowerMockito.doAnswer(invocation -> {
+ fileName.add(invocation.getArgument(0));
+ dataTime.add(invocation.getArgument(1));
+ return null;
+ }).when(task, "addToEvenMap", Mockito.anyString(),
Mockito.anyString());
+ Assert.assertTrue(task.isProfileValid(taskProfile));
+ manager.getTaskStore().storeTask(taskProfile);
+ task.init(manager, taskProfile, manager.getInstanceBasicStore());
+ EXECUTOR_SERVICE.submit(task);
+ } catch (Exception e) {
+ LOGGER.error("source init error", e);
+ Assert.assertTrue("source init error", false);
+ }
+ await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> fileName.size() == srcDataTimes.size() &&
dataTime.size() == srcDataTimes.size());
+ for (int i = 0; i < fileName.size(); i++) {
+ Assert.assertEquals(0, fileName.get(i).compareTo(srcKeys.get(i)));
+ Assert.assertEquals(0,
dataTime.get(i).compareTo(srcDataTimes.get(i)));
+ }
+ task.destroy();
+ }
+}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index 440c4a5208..7d1962c314 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.plugin.task;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.task.file.LogFileTask;
@@ -101,14 +100,13 @@ public class TestLogFileTask {
for (int i = 0; i < resources.size(); i++) {
resourceName.add(LOADER.getResource(resources.get(i)).getPath());
}
- TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
"csv", true, "", "", TaskStateEnum.RUNNING,
- cycle, "GMT+8:00", null);
+ TaskProfile taskProfile =
+ helper.getFileTaskProfile(taskId, pattern, "csv", true,
startTime, endTime, TaskStateEnum.RUNNING,
+ cycle, "GMT+8:00", null);
LogFileTask dayTask = null;
final List<String> fileName = new ArrayList();
final List<String> dataTime = new ArrayList();
try {
- taskProfile.set(TaskConstants.FILE_TASK_TIME_FROM, startTime);
- taskProfile.set(TaskConstants.FILE_TASK_TIME_TO, endTime);
dayTask = PowerMockito.spy(new LogFileTask());
PowerMockito.doAnswer(invocation -> {
fileName.add(invocation.getArgument(0));
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
index 014c5ce4e2..4cab9fa8f0 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
@@ -58,8 +58,9 @@ public class TestTaskManager {
manager = new TaskManager();
TaskStore taskStore = manager.getTaskStore();
for (int i = 1; i <= 10; i++) {
- TaskProfile taskProfile = helper.getTaskProfile(i, pattern,
"csv", false, "", "", TaskStateEnum.RUNNING,
- "D", "GMT+8:00", null);
+ TaskProfile taskProfile =
+ helper.getFileTaskProfile(i, pattern, "csv", false,
"", "", TaskStateEnum.RUNNING,
+ "D", "GMT+8:00", null);
taskProfile.setTaskClass(MockTask.class.getCanonicalName());
taskStore.storeTask(taskProfile);
}
@@ -74,7 +75,7 @@ public class TestTaskManager {
Assert.assertTrue("manager start error", false);
}
- TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv",
false, "", "", TaskStateEnum.RUNNING,
+ TaskProfile taskProfile1 = helper.getFileTaskProfile(100, pattern,
"csv", false, "", "", TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
String taskId1 = taskProfile1.getTaskId();
taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
@@ -99,7 +100,7 @@ public class TestTaskManager {
Assert.assertTrue(manager.getTaskProfile(taskId1).getState() ==
TaskStateEnum.RUNNING);
// test delete
- TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv",
false, "", "", TaskStateEnum.RUNNING,
+ TaskProfile taskProfile2 = helper.getFileTaskProfile(200, pattern,
"csv", false, "", "", TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles2 = new ArrayList<>();