This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c3fcff231 [INLONG-4397][Agent] Supports collect full data for File
(#4404)
c3fcff231 is described below
commit c3fcff2311100aae4f6df828653ef58889303af3
Author: Greedyu <[email protected]>
AuthorDate: Tue Jun 7 21:50:44 2022 +0800
[INLONG-4397][Agent] Supports collect full data for File (#4404)
---
.../inlong/agent/constant/FileCollectType.java | 28 ++++++++++++++++++
.../apache/inlong/agent/constant/JobConstants.java | 1 +
.../java/org/apache/inlong/agent/pojo/FileJob.java | 3 ++
.../apache/inlong/agent/pojo/JobProfileDto.java | 1 +
.../apache/inlong/agent/core/job/JobWrapper.java | 4 +++
.../inlong/agent/core/trigger/TriggerManager.java | 13 +++++++++
.../agent/plugin/sources/TextFileSource.java | 27 +++++++++---------
.../agent/plugin/trigger/DirectoryTrigger.java | 1 +
.../apache/inlong/agent/plugin/TestFileAgent.java | 33 ++++++++++++++++++++++
.../inlong/agent/plugin/utils/TestUtils.java | 23 +++++++++++++++
10 files changed, 121 insertions(+), 13 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileCollectType.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileCollectType.java
new file mode 100644
index 000000000..9164032f1
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileCollectType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * Collection type of file data
+ */
+public class FileCollectType {
+
+ public static final String INCREMENT = "INCREMENT";
+
+ public static final String FULL = "FULL";
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 034e43f73..3c1a8e46e 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -48,6 +48,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_FILE_TIME_OFFSET = "job.fileJob.timeOffset";
public static final String JOB_FILE_MAX_WAIT = "job.fileJob.file.max.wait";
public static final String JOB_CYCLE_UNIT = "job.fileJob.cycleUnit";
+ public static final String JOB_FILE_COLLECT_TYPE =
"job.fileJob.collectType";
//Binlog job
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 21c94d34b..6bbe0d97d 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -29,6 +29,7 @@ public class FileJob {
private int id;
private String timeOffset;
private String addictiveString;
+ private String collectType;
@Data
public static class Dir {
@@ -59,6 +60,8 @@ public class FileJob {
private String timeOffset;
//For example: a=b&c=b&e=f
private String additionalAttr;
+
+ private String collectType;
}
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 55a5a7815..e850f6f2b 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -101,6 +101,7 @@ public class JobProfileDto {
FileJob.Dir dir = new FileJob.Dir();
dir.setPattern(fileJobTaskConfig.getPattern());
fileJob.setDir(dir);
+ fileJob.setCollectType(fileJobTaskConfig.getCollectType());
if (fileJobTaskConfig.getTimeOffset() != null) {
fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index d212a8d20..2bdc3874c 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -164,4 +164,8 @@ public class JobWrapper extends AbstractStateWrapper {
jobManager.markJobAsSuccess(job.getJobInstanceId());
}));
}
+
+ public List<Task> getAllTasks() {
+ return allTasks;
+ }
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index 1e05d58dd..c4ecf512f 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.FileCollectType;
import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.core.job.JobWrapper;
@@ -106,11 +107,23 @@ public class TriggerManager extends AbstractDaemon {
triggerProfile.toJsonStr(), this.triggerMap.size(),
maxRunningNum);
return false;
}
+ preprocessTrigger(triggerProfile);
triggerProfileDB.storeTrigger(triggerProfile);
addTrigger(triggerProfile);
return true;
}
+ /**
+ * Preprocessing before adding trigger
+ */
+ public void preprocessTrigger(TriggerProfile profile) {
+ String syncType = profile.get(JobConstants.JOB_FILE_COLLECT_TYPE, "");
+ if (FileCollectType.FULL.equals(syncType)) {
+ LOGGER.info("Initialize submit full path. trigger {} ",
profile.getTriggerId());
+ manager.getJobManager().submitFileJobProfile(profile);
+ }
+ }
+
private Runnable jobFetchThread() {
return () -> {
while (isRunnable()) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 00b336ea6..9baeb352c 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -17,18 +17,6 @@
package org.apache.inlong.agent.plugin.sources;
-import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
-import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
-import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
-import static
org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
-import static
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
@@ -42,6 +30,18 @@ import org.apache.inlong.agent.utils.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
+import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
+import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
+
/**
* Read text files
*/
@@ -74,7 +74,8 @@ public class TextFileSource implements Source {
String filterPattern = jobConf.get(JOB_LINE_FILTER_PATTERN,
DEFAULT_JOB_LINE_FILTER);
for (File file : allFiles) {
int seekPosition = jobConf.getInt(file.getAbsolutePath() +
POSITION_SUFFIX, 0);
- LOGGER.info("read from history position {} with job profile {}",
seekPosition, jobConf.getInstanceId());
+ LOGGER.info("read from history position {} with job profile {},
file absolute path: {}", seekPosition,
+ jobConf.getInstanceId(), file.getAbsolutePath());
String md5 = jobConf.get(file.getAbsolutePath() + MD5_SUFFIX, "");
TextFileReader textFileReader = new TextFileReader(file,
seekPosition);
long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT,
DEFAULT_JOB_READ_WAIT_TIMEOUT);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
index 944c68640..743eb0bd9 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
@@ -270,6 +270,7 @@ public class DirectoryTrigger extends AbstractDaemon
implements Trigger {
interval = profile.getInt(
AgentConstants.TRIGGER_CHECK_INTERVAL,
AgentConstants.DEFAULT_TRIGGER_CHECK_INTERVAL);
this.profile = profile;
+
if (this.profile.hasKey(JobConstants.JOB_DIR_FILTER_PATTERN)) {
String pathPattern =
this.profile.get(JobConstants.JOB_DIR_FILTER_PATTERN);
String timeOffset =
this.profile.get(JobConstants.JOB_FILE_TIME_OFFSET, "");
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 484c16820..006d89547 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin;
import org.apache.commons.io.IOUtils;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.agent.constant.FileCollectType;
import org.apache.inlong.agent.core.job.JobWrapper;
import org.apache.inlong.agent.core.trigger.TriggerManager;
import org.apache.inlong.agent.db.StateSearchKey;
@@ -42,12 +43,14 @@ import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_MESSAGE_FILTER_CLASSNAME;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_CYCLE_UNIT;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_COLLECT_TYPE;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TIME_OFFSET;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
@@ -144,6 +147,36 @@ public class TestFileAgent {
Assert.assertTrue(checkOnlyOneJob());
}
+ private Long checkFullPathReadJob() {
+ Map<String, JobWrapper> jobs =
agent.getManager().getJobManager().getJobs();
+ AtomicLong result = new AtomicLong(0L);
+ jobs.forEach((s, jobWrapper) -> {
+ if
(FileCollectType.FULL.equals(jobWrapper.getJob().getJobConf().get(JOB_FILE_COLLECT_TYPE,
null))) {
+ result.set(jobWrapper.getAllTasks().size());
+ }
+ });
+ return result.get();
+ }
+
+ @Test
+ public void testOneJobFullPath() throws Exception {
+ String jsonString = TestUtils.getTestTriggerProfile();
+ TriggerProfile triggerProfile =
TriggerProfile.parseJsonStr(jsonString);
+ String path =
Paths.get(getClass().getClassLoader().getResource("test").toURI()).toString();
+ String fileName = path + "/increment_test.txt";
+ TestUtils.deleteFile(fileName);
+ triggerProfile.set(JOB_DIR_FILTER_PATTERN, path);
+ triggerProfile.set(JOB_FILE_MAX_WAIT, "-1");
+ triggerProfile.set(JOB_FILE_COLLECT_TYPE, FileCollectType.FULL);
+ TriggerManager triggerManager = agent.getManager().getTriggerManager();
+ triggerManager.submitTrigger(triggerProfile);
+ Thread.currentThread().sleep(2000);
+ Assert.assertEquals(3L, checkFullPathReadJob().longValue());
+ TestUtils.createFile(fileName);
+ Thread.currentThread().sleep(10000);
+ TestUtils.deleteFile(fileName);
+ }
+
private boolean checkOnlyOneJob() {
Map<String, JobWrapper> jobs =
agent.getManager().getJobManager().getJobs();
AtomicBoolean result = new AtomicBoolean(false);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index 55bb81161..80404e6ea 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -17,8 +17,11 @@
package org.apache.inlong.agent.plugin.utils;
+import org.apache.commons.io.FileUtils;
import org.apache.inlong.common.metric.MetricRegister;
import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.FileWriter;
import java.nio.file.Files;
@@ -32,6 +35,9 @@ import static org.mockito.ArgumentMatchers.any;
public class TestUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestUtils.class);
+ private static final String RECORD = "This is the test line for file\n";
+
public static String getTestTriggerProfile() {
return "{\n"
+ " \"job\": {\n"
@@ -87,4 +93,21 @@ public class TestUtils {
PowerMockito.mockStatic(MetricRegister.class);
PowerMockito.doNothing().when(MetricRegister.class, "register", any());
}
+
+ public static void createFile(String fileName) throws Exception {
+ FileWriter writer = new FileWriter(fileName);
+ for (int i = 0; i < 1; i++) {
+ writer.write(RECORD);
+ }
+ writer.flush();
+ writer.close();
+ }
+
+ public static void deleteFile(String fileName) throws Exception {
+ try {
+ FileUtils.delete(Paths.get(fileName).toFile());
+ } catch (Exception ignored) {
+ LOGGER.warn("deleteFile error ", ignored);
+ }
+ }
}