This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bfc7750ba8 [INLONG-9481][Agent] Add unit test of reading with offset 
(#9482)
bfc7750ba8 is described below

commit bfc7750ba830bb53e71a2a319bc85b96c41ace69
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Dec 15 13:08:09 2023 +0800

    [INLONG-9481][Agent] Add unit test of reading with offset (#9482)
---
 .../agent/plugin/sources/TestLogFileSource.java    | 45 +++++++++++++++++-----
 1 file changed, 36 insertions(+), 9 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index be69fbcfea..3a002064fb 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.TaskConstants;
@@ -45,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
 import static org.awaitility.Awaitility.await;
 
 public class TestLogFileSource {
@@ -55,7 +57,6 @@ public class TestLogFileSource {
     private static final Gson GSON = new Gson();
     private static final String[] check = {"hello line-end-symbol aa", "world 
line-end-symbol",
             "agent line-end-symbol"};
-    private static InstanceProfile instanceProfile;
     // instance basic db
     private static Db instanceBasicDb;
     // offset basic db
@@ -64,20 +65,20 @@ public class TestLogFileSource {
     @BeforeClass
     public static void setup() {
 
-        String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
         helper = new 
AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome();
-        String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING);
-        instanceProfile = taskProfile.createInstanceProfile("",
-                fileName, taskProfile.getCycleUnit(), "20230928", 
AgentUtils.getCurrentTime());
         instanceBasicDb = 
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
         offsetBasicDb =
                 TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
         OffsetManager.init(offsetBasicDb, instanceBasicDb);
     }
 
-    private LogFileSource getSource() {
+    private LogFileSource getSource(int taskId, long offset) {
         try {
+            String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
+            TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
false, 0L, 0L, TaskStateEnum.RUNNING);
+            String fileName = 
LOADER.getResource("test/20230928_1.txt").getPath();
+            InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile("",
+                    fileName, taskProfile.getCycleUnit(), "20230928", 
AgentUtils.getCurrentTime());
             instanceProfile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(instanceProfile.getInstanceId()));
             LogFileSource source = new LogFileSource();
             Whitebox.setInternalState(source, "BATCH_READ_LINE_COUNT", 1);
@@ -86,6 +87,12 @@ public class TestLogFileSource {
             Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 
2);
             Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
             Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
+            if (offset > 0) {
+                OffsetProfile offsetProfile = new 
OffsetProfile(instanceProfile.getTaskId(),
+                        instanceProfile.getInstanceId(),
+                        offset, instanceProfile.get(INODE_INFO));
+                OffsetManager.getInstance().setOffset(offsetProfile);
+            }
             source.init(instanceProfile);
             return source;
         } catch (Exception e) {
@@ -104,6 +111,7 @@ public class TestLogFileSource {
     public void testLogFileSource() {
         testFullRead();
         testCleanQueue();
+        testReadWithOffset();
     }
 
     private void testFullRead() {
@@ -111,7 +119,7 @@ public class TestLogFileSource {
         for (int i = 0; i < check.length; i++) {
             srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
         }
-        LogFileSource source = getSource();
+        LogFileSource source = getSource(1, 0);
         int cnt = 0;
         Message msg = source.read();
         int readLen = 0;
@@ -131,7 +139,7 @@ public class TestLogFileSource {
     }
 
     private void testCleanQueue() {
-        LogFileSource source = getSource();
+        LogFileSource source = getSource(2, 0);
         for (int i = 0; i < 2; i++) {
             source.read();
         }
@@ -140,4 +148,23 @@ public class TestLogFileSource {
         int leftAfterRead = 
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
         Assert.assertTrue(leftAfterRead == 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
     }
+
+    private void testReadWithOffset() {
+        LogFileSource source = getSource(3, 1);
+        for (int i = 0; i < 2; i++) {
+            Message msg = source.read();
+            Assert.assertTrue(msg != null);
+        }
+        Message msg = source.read();
+        Assert.assertTrue(msg == null);
+        source.destroy();
+
+        source = getSource(4, 3);
+        msg = source.read();
+        Assert.assertTrue(msg == null);
+        source.destroy();
+
+        int leftAfterRead = 
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
+        Assert.assertTrue(leftAfterRead == 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
+    }
 }
\ No newline at end of file

Reply via email to