This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c3fcff231 [INLONG-4397][Agent] Supports collect full data for File 
(#4404)
c3fcff231 is described below

commit c3fcff2311100aae4f6df828653ef58889303af3
Author: Greedyu <[email protected]>
AuthorDate: Tue Jun 7 21:50:44 2022 +0800

    [INLONG-4397][Agent] Supports collect full data for File (#4404)
---
 .../inlong/agent/constant/FileCollectType.java     | 28 ++++++++++++++++++
 .../apache/inlong/agent/constant/JobConstants.java |  1 +
 .../java/org/apache/inlong/agent/pojo/FileJob.java |  3 ++
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  1 +
 .../apache/inlong/agent/core/job/JobWrapper.java   |  4 +++
 .../inlong/agent/core/trigger/TriggerManager.java  | 13 +++++++++
 .../agent/plugin/sources/TextFileSource.java       | 27 +++++++++---------
 .../agent/plugin/trigger/DirectoryTrigger.java     |  1 +
 .../apache/inlong/agent/plugin/TestFileAgent.java  | 33 ++++++++++++++++++++++
 .../inlong/agent/plugin/utils/TestUtils.java       | 23 +++++++++++++++
 10 files changed, 121 insertions(+), 13 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileCollectType.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileCollectType.java
new file mode 100644
index 000000000..9164032f1
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileCollectType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * Collection type of file data
+ */
+public class FileCollectType {
+
+    public static final String INCREMENT = "INCREMENT";
+
+    public static final String FULL = "FULL";
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 034e43f73..3c1a8e46e 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -48,6 +48,7 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_FILE_TIME_OFFSET = "job.fileJob.timeOffset";
     public static final String JOB_FILE_MAX_WAIT = "job.fileJob.file.max.wait";
     public static final String JOB_CYCLE_UNIT = "job.fileJob.cycleUnit";
+    public static final String JOB_FILE_COLLECT_TYPE = 
"job.fileJob.collectType";
 
     //Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 21c94d34b..6bbe0d97d 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -29,6 +29,7 @@ public class FileJob  {
     private int id;
     private String timeOffset;
     private String addictiveString;
+    private String collectType;
 
     @Data
     public static class Dir {
@@ -59,6 +60,8 @@ public class FileJob  {
         private String timeOffset;
         //For example: a=b&c=b&e=f
         private String additionalAttr;
+
+        private String collectType;
     }
 
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 55a5a7815..e850f6f2b 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -101,6 +101,7 @@ public class JobProfileDto {
         FileJob.Dir dir = new FileJob.Dir();
         dir.setPattern(fileJobTaskConfig.getPattern());
         fileJob.setDir(dir);
+        fileJob.setCollectType(fileJobTaskConfig.getCollectType());
 
         if (fileJobTaskConfig.getTimeOffset() != null) {
             fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index d212a8d20..2bdc3874c 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -164,4 +164,8 @@ public class JobWrapper extends AbstractStateWrapper {
             jobManager.markJobAsSuccess(job.getJobInstanceId());
         }));
     }
+
+    public List<Task> getAllTasks() {
+        return allTasks;
+    }
 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index 1e05d58dd..c4ecf512f 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.conf.TriggerProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.FileCollectType;
 import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.core.job.JobWrapper;
@@ -106,11 +107,23 @@ public class TriggerManager extends AbstractDaemon {
                     triggerProfile.toJsonStr(), this.triggerMap.size(), 
maxRunningNum);
             return false;
         }
+        preprocessTrigger(triggerProfile);
         triggerProfileDB.storeTrigger(triggerProfile);
         addTrigger(triggerProfile);
         return true;
     }
 
+    /**
+     * Preprocessing before adding trigger
+     */
+    public void preprocessTrigger(TriggerProfile profile) {
+        String syncType = profile.get(JobConstants.JOB_FILE_COLLECT_TYPE, "");
+        if (FileCollectType.FULL.equals(syncType)) {
+            LOGGER.info("Initialize submit full path. trigger {} ", 
profile.getTriggerId());
+            manager.getJobManager().submitFileJobProfile(profile);
+        }
+    }
+
     private Runnable jobFetchThread() {
         return () -> {
             while (isRunnable()) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 00b336ea6..9baeb352c 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -17,18 +17,6 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
-import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
-import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
-import static 
org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
-import static 
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.Source;
@@ -42,6 +30,18 @@ import org.apache.inlong.agent.utils.ConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
+import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
+import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
+import static 
org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
+import static 
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
+
 /**
  * Read text files
  */
@@ -74,7 +74,8 @@ public class TextFileSource implements Source {
         String filterPattern = jobConf.get(JOB_LINE_FILTER_PATTERN, 
DEFAULT_JOB_LINE_FILTER);
         for (File file : allFiles) {
             int seekPosition = jobConf.getInt(file.getAbsolutePath() + 
POSITION_SUFFIX, 0);
-            LOGGER.info("read from history position {} with job profile {}", 
seekPosition, jobConf.getInstanceId());
+            LOGGER.info("read from history position {} with job profile {}, 
file absolute path: {}", seekPosition,
+                    jobConf.getInstanceId(), file.getAbsolutePath());
             String md5 = jobConf.get(file.getAbsolutePath() + MD5_SUFFIX, "");
             TextFileReader textFileReader = new TextFileReader(file, 
seekPosition);
             long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT, 
DEFAULT_JOB_READ_WAIT_TIMEOUT);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
index 944c68640..743eb0bd9 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
@@ -270,6 +270,7 @@ public class DirectoryTrigger extends AbstractDaemon 
implements Trigger {
         interval = profile.getInt(
                 AgentConstants.TRIGGER_CHECK_INTERVAL, 
AgentConstants.DEFAULT_TRIGGER_CHECK_INTERVAL);
         this.profile = profile;
+
         if (this.profile.hasKey(JobConstants.JOB_DIR_FILTER_PATTERN)) {
             String pathPattern = 
this.profile.get(JobConstants.JOB_DIR_FILTER_PATTERN);
             String timeOffset = 
this.profile.get(JobConstants.JOB_FILE_TIME_OFFSET, "");
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 484c16820..006d89547 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin;
 import org.apache.commons.io.IOUtils;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.agent.constant.FileCollectType;
 import org.apache.inlong.agent.core.job.JobWrapper;
 import org.apache.inlong.agent.core.trigger.TriggerManager;
 import org.apache.inlong.agent.db.StateSearchKey;
@@ -42,12 +43,14 @@ import java.nio.file.Paths;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_MESSAGE_FILTER_CLASSNAME;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_CYCLE_UNIT;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
+import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_COLLECT_TYPE;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TIME_OFFSET;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
@@ -144,6 +147,36 @@ public class TestFileAgent {
         Assert.assertTrue(checkOnlyOneJob());
     }
 
+    private Long checkFullPathReadJob() {
+        Map<String, JobWrapper> jobs = 
agent.getManager().getJobManager().getJobs();
+        AtomicLong result = new AtomicLong(0L);
+        jobs.forEach((s, jobWrapper) -> {
+            if 
(FileCollectType.FULL.equals(jobWrapper.getJob().getJobConf().get(JOB_FILE_COLLECT_TYPE,
 null))) {
+                result.set(jobWrapper.getAllTasks().size());
+            }
+        });
+        return result.get();
+    }
+
+    @Test
+    public void testOneJobFullPath() throws Exception {
+        String jsonString = TestUtils.getTestTriggerProfile();
+        TriggerProfile triggerProfile = 
TriggerProfile.parseJsonStr(jsonString);
+        String path = 
Paths.get(getClass().getClassLoader().getResource("test").toURI()).toString();
+        String fileName = path + "/increment_test.txt";
+        TestUtils.deleteFile(fileName);
+        triggerProfile.set(JOB_DIR_FILTER_PATTERN, path);
+        triggerProfile.set(JOB_FILE_MAX_WAIT, "-1");
+        triggerProfile.set(JOB_FILE_COLLECT_TYPE, FileCollectType.FULL);
+        TriggerManager triggerManager = agent.getManager().getTriggerManager();
+        triggerManager.submitTrigger(triggerProfile);
+        Thread.currentThread().sleep(2000);
+        Assert.assertEquals(3L, checkFullPathReadJob().longValue());
+        TestUtils.createFile(fileName);
+        Thread.currentThread().sleep(10000);
+        TestUtils.deleteFile(fileName);
+    }
+
     private boolean checkOnlyOneJob() {
         Map<String, JobWrapper> jobs = 
agent.getManager().getJobManager().getJobs();
         AtomicBoolean result = new AtomicBoolean(false);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index 55bb81161..80404e6ea 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -17,8 +17,11 @@
 
 package org.apache.inlong.agent.plugin.utils;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.FileWriter;
 import java.nio.file.Files;
@@ -32,6 +35,9 @@ import static org.mockito.ArgumentMatchers.any;
 
 public class TestUtils {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestUtils.class);
+    private static final String RECORD = "This is the test line for file\n";
+
     public static String getTestTriggerProfile() {
         return "{\n"
                 + "  \"job\": {\n"
@@ -87,4 +93,21 @@ public class TestUtils {
         PowerMockito.mockStatic(MetricRegister.class);
         PowerMockito.doNothing().when(MetricRegister.class, "register", any());
     }
+
+    public static void createFile(String fileName) throws Exception {
+        FileWriter writer = new FileWriter(fileName);
+        for (int i = 0; i < 1; i++) {
+            writer.write(RECORD);
+        }
+        writer.flush();
+        writer.close();
+    }
+
+    public static void deleteFile(String fileName) throws Exception {
+        try {
+            FileUtils.delete(Paths.get(fileName).toFile());
+        } catch (Exception ignored) {
+            LOGGER.warn("deleteFile error ", ignored);
+        }
+    }
 }

Reply via email to