This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bfc7750ba8 [INLONG-9481][Agent] Add unit test of reading with offset
(#9482)
bfc7750ba8 is described below
commit bfc7750ba830bb53e71a2a319bc85b96c41ace69
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Dec 15 13:08:09 2023 +0800
[INLONG-9481][Agent] Add unit test of reading with offset (#9482)
---
.../agent/plugin/sources/TestLogFileSource.java | 45 +++++++++++++++++-----
1 file changed, 36 insertions(+), 9 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index be69fbcfea..3a002064fb 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.TaskConstants;
@@ -45,6 +46,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
import static org.awaitility.Awaitility.await;
public class TestLogFileSource {
@@ -55,7 +57,6 @@ public class TestLogFileSource {
private static final Gson GSON = new Gson();
private static final String[] check = {"hello line-end-symbol aa", "world
line-end-symbol",
"agent line-end-symbol"};
- private static InstanceProfile instanceProfile;
// instance basic db
private static Db instanceBasicDb;
// offset basic db
@@ -64,20 +65,20 @@ public class TestLogFileSource {
@BeforeClass
public static void setup() {
- String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome();
- String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING);
- instanceProfile = taskProfile.createInstanceProfile("",
- fileName, taskProfile.getCycleUnit(), "20230928",
AgentUtils.getCurrentTime());
instanceBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
offsetBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
OffsetManager.init(offsetBasicDb, instanceBasicDb);
}
- private LogFileSource getSource() {
+ private LogFileSource getSource(int taskId, long offset) {
try {
+ String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
+ TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
false, 0L, 0L, TaskStateEnum.RUNNING);
+ String fileName =
LOADER.getResource("test/20230928_1.txt").getPath();
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile("",
+ fileName, taskProfile.getCycleUnit(), "20230928",
AgentUtils.getCurrentTime());
instanceProfile.set(TaskConstants.INODE_INFO,
FileDataUtils.getInodeInfo(instanceProfile.getInstanceId()));
LogFileSource source = new LogFileSource();
Whitebox.setInternalState(source, "BATCH_READ_LINE_COUNT", 1);
@@ -86,6 +87,12 @@ public class TestLogFileSource {
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE",
2);
Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
+ if (offset > 0) {
+ OffsetProfile offsetProfile = new
OffsetProfile(instanceProfile.getTaskId(),
+ instanceProfile.getInstanceId(),
+ offset, instanceProfile.get(INODE_INFO));
+ OffsetManager.getInstance().setOffset(offsetProfile);
+ }
source.init(instanceProfile);
return source;
} catch (Exception e) {
@@ -104,6 +111,7 @@ public class TestLogFileSource {
public void testLogFileSource() {
testFullRead();
testCleanQueue();
+ testReadWithOffset();
}
private void testFullRead() {
@@ -111,7 +119,7 @@ public class TestLogFileSource {
for (int i = 0; i < check.length; i++) {
srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
}
- LogFileSource source = getSource();
+ LogFileSource source = getSource(1, 0);
int cnt = 0;
Message msg = source.read();
int readLen = 0;
@@ -131,7 +139,7 @@ public class TestLogFileSource {
}
private void testCleanQueue() {
- LogFileSource source = getSource();
+ LogFileSource source = getSource(2, 0);
for (int i = 0; i < 2; i++) {
source.read();
}
@@ -140,4 +148,23 @@ public class TestLogFileSource {
int leftAfterRead =
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
Assert.assertTrue(leftAfterRead ==
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
}
+
+ private void testReadWithOffset() {
+ LogFileSource source = getSource(3, 1);
+ for (int i = 0; i < 2; i++) {
+ Message msg = source.read();
+ Assert.assertTrue(msg != null);
+ }
+ Message msg = source.read();
+ Assert.assertTrue(msg == null);
+ source.destroy();
+
+ source = getSource(4, 3);
+ msg = source.read();
+ Assert.assertTrue(msg == null);
+ source.destroy();
+
+ int leftAfterRead =
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
+ Assert.assertTrue(leftAfterRead ==
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
+ }
}
\ No newline at end of file