This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 81fb821718 [INLONG-11506][Agent] Task start and end time using string
type (#11507)
81fb821718 is described below
commit 81fb8217183b65c365cdaaf16b36a631c163286b
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Nov 19 18:32:43 2024 +0800
[INLONG-11506][Agent] Task start and end time using string type (#11507)
---
.../apache/inlong/agent/conf/InstanceProfile.java | 4 +-
.../org/apache/inlong/agent/conf/TaskProfile.java | 4 +-
.../inlong/agent/constant/TaskConstants.java | 6 +-
.../org/apache/inlong/agent/pojo/FileTask.java | 8 +-
.../apache/inlong/agent/pojo/TaskProfileDto.java | 4 +-
.../inlong/agent/core/AgentBaseTestsHelper.java | 8 +-
.../agent/plugin/fetcher/ManagerFetcher.java | 27 +--
.../inlong/agent/plugin/instance/FileInstance.java | 2 +-
.../inlong/agent/plugin/sources/LogFileSource.java | 2 +-
.../inlong/agent/plugin/task/file/AgentErrMsg.java | 67 -------
.../plugin/{utils => task}/file/FileDataUtils.java | 2 +-
.../inlong/agent/plugin/task/file/FileScanner.java | 87 ++-------
.../{utils => task}/file/FileTimeComparator.java | 2 +-
.../agent/plugin/{utils => task}/file/Files.java | 2 +-
.../inlong/agent/plugin/task/file/LogFileTask.java | 107 ++++++-----
.../inlong/agent/plugin/task/file/WatchEntity.java | 15 +-
.../plugin/utils/{file => regex}/DateUtils.java | 195 +--------------------
.../plugin/utils/{file => regex}/MatchPoint.java | 2 +-
.../plugin/utils/{file => regex}/NewDateUtils.java | 41 +----
.../{file => regex}/NonRegexPatternPosition.java | 2 +-
.../utils/{file => regex}/PathDateExpression.java | 2 +-
.../FilePathUtil.java => regex/PatternUtil.java} | 17 +-
.../inlong/agent/plugin/utils/regex/Scanner.java | 97 ++++++++++
.../inlong/agent/plugin/AgentBaseTestsHelper.java | 12 +-
.../agent/plugin/instance/TestInstanceManager.java | 2 +-
.../inlong/agent/plugin/sinks/KafkaSinkTest.java | 2 +-
.../inlong/agent/plugin/sinks/PulsarSinkTest.java | 2 +-
.../sinks/filecollect/TestSenderManager.java | 4 +-
.../agent/plugin/sources/TestLogFileSource.java | 4 +-
.../agent/plugin/sources/TestRedisSource.java | 2 +-
.../agent/plugin/sources/TestSQLServerSource.java | 2 +-
.../inlong/agent/plugin/task/TestLogFileTask.java | 28 ++-
.../inlong/agent/plugin/task/TestTaskManager.java | 6 +-
.../inlong/agent/plugin/utils/TestUtils.java | 29 ++-
34 files changed, 266 insertions(+), 530 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index 9e85872ff5..c9a3d6a022 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -36,10 +36,10 @@ import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
/**
* job profile which contains details describing properties of one job.
@@ -200,6 +200,6 @@ public class InstanceProfile extends AbstractConfiguration
implements Comparable
}
public boolean isRetry() {
- return getBoolean(TASK_RETRY, false);
+ return getBoolean(FILE_TASK_RETRY, false);
}
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index 1f77433c9f..32450735e4 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -36,7 +36,7 @@ import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
+import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
/**
@@ -82,7 +82,7 @@ public class TaskProfile extends AbstractConfiguration {
}
public boolean isRetry() {
- return getBoolean(TASK_RETRY, false);
+ return getBoolean(FILE_TASK_RETRY, false);
}
public String getTaskClass() {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 5dbc353d25..22fb87e6e5 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -62,9 +62,9 @@ public class TaskConstants extends CommonConstants {
public static final String SOURCE_DATA_CONTENT_STYLE =
"task.fileTask.dataContentStyle";
public static final String SOURCE_DATA_SEPARATOR =
"task.fileTask.dataSeparator";
public static final String SOURCE_FILTER_STREAMS =
"task.fileTask.filterStreams";
- public static final String TASK_RETRY = "task.fileTask.retry";
- public static final String TASK_START_TIME = "task.fileTask.startTime";
- public static final String TASK_END_TIME = "task.fileTask.endTime";
+ public static final String FILE_TASK_RETRY = "task.fileTask.retry";
+ public static final String FILE_TASK_TIME_FROM =
"task.fileTask.dataTimeFrom";
+ public static final String FILE_TASK_TIME_TO = "task.fileTask.dataTimeTo";
public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
public static final String PREDEFINE_FIELDS = "task.predefinedFields";
public static final String TASK_AUDIT_VERSION = "task.auditVersion";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
index 57c294f7d4..54c191ffca 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
@@ -29,8 +29,8 @@ public class FileTask {
private Integer id;
private String cycleUnit;
private Boolean retry;
- private Long startTime;
- private Long endTime;
+ private String dataTimeFrom;
+ private String dataTimeTo;
private String timeOffset;
private String timeZone;
private String addictiveString;
@@ -91,9 +91,9 @@ public class FileTask {
private Boolean retry;
- private Long startTime;
+ private String dataTimeFrom;
- private Long endTime;
+ private String dataTimeTo;
private String pattern;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 85c636c885..2d4a6a32ae 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -166,8 +166,8 @@ public class TaskProfileDto {
fileTask.setMaxFileCount(taskConfig.getMaxFileCount());
fileTask.setRetry(taskConfig.getRetry());
fileTask.setCycleUnit(taskConfig.getCycleUnit());
- fileTask.setStartTime(taskConfig.getStartTime());
- fileTask.setEndTime(taskConfig.getEndTime());
+ fileTask.setDataTimeFrom(taskConfig.getDataTimeFrom());
+ fileTask.setDataTimeTo(taskConfig.getDataTimeTo());
if (taskConfig.getFilterStreams() != null) {
fileTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams()));
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
index fa37fa2f1c..a8dfbdf91a 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
@@ -75,14 +75,14 @@ public class AgentBaseTestsHelper {
}
}
- public TaskProfile getTaskProfile(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
+ public TaskProfile getTaskProfile(int taskId, String pattern, boolean
retry, String startTime, String endTime,
TaskStateEnum state, String timeZone) {
DataConfig dataConfig = getDataConfig(taskId, pattern, retry,
startTime, endTime, state, timeZone);
TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
return profile;
}
- private DataConfig getDataConfig(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
+ private DataConfig getDataConfig(int taskId, String pattern, boolean
retry, String startTime, String endTime,
TaskStateEnum state, String timeZone) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
@@ -98,8 +98,8 @@ public class AgentBaseTestsHelper {
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("h");
fileTaskConfig.setRetry(retry);
- fileTaskConfig.setStartTime(startTime);
- fileTaskConfig.setEndTime(endTime);
+ fileTaskConfig.setDataTimeFrom(startTime);
+ fileTaskConfig.setDataTimeTo(endTime);
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
return dataConfig;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index e0125751c3..01d7128f8b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -42,10 +42,7 @@ import com.google.gson.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
@@ -225,26 +222,16 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
private TaskResult getTestConfig(String testDir, int normalTaskId, int
retryTaskId, int state) {
List<DataConfig> configs = new ArrayList<>();
- String startStr = "2023-07-10 00:00:00";
- String endStr = "2023-07-22 00:00:00";
- Long start = 0L;
- Long end = 0L;
String normalPattern = testDir + "YYYY/YYYYMMDDhhmm_2.log_[0-9]+";
String retryPattern = testDir + "YYYY/YYYYMMDD_1.log_[0-9]+";
- try {
- Date parse = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").parse(startStr);
- start = parse.getTime();
- parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endStr);
- end = parse.getTime();
- } catch (ParseException e) {
- e.printStackTrace();
- }
- configs.add(getTestDataConfig(normalTaskId, normalPattern, false,
start, end, CycleUnitType.MINUTE, state));
- configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start,
end, CycleUnitType.DAY, state));
+ configs.add(getTestDataConfig(normalTaskId, normalPattern, false,
"202307100000", "202307220000",
+ CycleUnitType.MINUTE, state));
+ configs.add(
+ getTestDataConfig(retryTaskId, retryPattern, true, "20230710",
"20230722", CycleUnitType.DAY, state));
return TaskResult.builder().dataConfigs(configs).build();
}
- private DataConfig getTestDataConfig(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
+ private DataConfig getTestDataConfig(int taskId, String pattern, boolean
retry, String startTime, String endTime,
String cycleUnit, int state) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("devcloud_group_id");
@@ -260,8 +247,8 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit(cycleUnit);
fileTaskConfig.setRetry(retry);
- fileTaskConfig.setStartTime(startTime);
- fileTaskConfig.setEndTime(endTime);
+ fileTaskConfig.setDataTimeFrom(startTime);
+ fileTaskConfig.setDataTimeTo(endTime);
fileTaskConfig.setDataContentStyle("CSV");
fileTaskConfig.setDataSeparator("|");
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 3c86a4c33f..38cb969376 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -19,7 +19,7 @@ package org.apache.inlong.agent.plugin.instance;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import java.io.IOException;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 9ce20f6daa..5aebbc7d86 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -28,7 +28,7 @@ import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import
org.apache.inlong.agent.plugin.sources.file.extend.DefaultExtendedHandler;
-import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/AgentErrMsg.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/AgentErrMsg.java
deleted file mode 100644
index aa7e5c734f..0000000000
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/AgentErrMsg.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.task.file;
-
-public class AgentErrMsg {
-
- public static final String CONFIG_SUCCESS = "SUCCESS";
-
- // data source config error */
- public static final String DATA_SOURCE_CONFIG_ERROR =
"ERROR-0-INLONG_AGENT|10001|ERROR"
- + "|ERROR_DATA_SOURCE_CONFIG|";
-
- // directory not found error */
- public static final String DIRECTORY_NOT_FOUND_ERROR =
"ERROR-0-INLONG_AGENT|11001|WARN"
- + "|WARN_DIRECTORY_NOT_EXIST|";
-
- // watch directory error */
- public static final String WATCH_DIR_ERROR =
"ERROR-0-INLONG_AGENT|11002|ERROR"
- + "|ERROR_WATCH_DIR_ERROR|";
-
- // file error(not found,rotate)
- public static final String FILE_ERROR =
"ERROR-0-INLONG_AGENT|10002|ERROR|ERROR_SOURCE_FILE|";
-
- // read file error
- public static final String FILE_OP_ERROR =
"ERROR-1-INLONG_AGENT|30002|ERROR|ERROR_OPERATE_FILE|";
-
- // disk full
- public static final String DISK_FULL =
"ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_DISK_FULL|";
-
- // out of memory
- public static final String OOM_ERROR =
"ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_OOM_ERROR|";
-
- // watcher error
- public static final String WATCHER_INVALID =
"ERROR-1-INLONG_AGENT|40001|WARN|WARN_INVALID_WATCHER|";
-
- // could not connect to manager
- public static final String CONNECT_MANAGER_ERROR =
"ERROR-1-INLONG_AGENT|30002|ERROR"
- + "|ERROR_CANNOT_CONNECT_TO_MANAGER|";
-
- // send data to dataProxy failed
- public static final String SEND_TO_BUS_ERROR =
"ERROR-1-INLONG_AGENT|30003|ERROR|ERROR_SEND_TO_BUS|";
-
- // operate bdb error
- public static final String BDB_ERROR =
"ERROR-1-INLONG_AGENT|30003|ERROR|BDB_OPERATION_ERROR|";
-
- // buffer full
- public static final String MSG_BUFFER_FULL =
"ERROR-1-INLONG_AGENT|40002|WARN|WARN_MSG_BUFFER_FULL|";
-
- // found event invalid(task has been delete)
- public static final String FOUND_EVENT_INVALID =
"ERROR-1-INLONG_AGENT|30003|ERROR"
- + "|FOUND_EVENT_INVALID|";
-}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
similarity index 96%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
index 57b4702848..d65960ab17 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.task.file;
import java.io.IOException;
import java.nio.file.Files;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
index e37b6deb89..7fa759a9d2 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
@@ -17,10 +17,9 @@
package org.apache.inlong.agent.plugin.task.file;
-import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
-import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
-import org.apache.inlong.agent.plugin.utils.file.Files;
-import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner.FinalPatternInfo;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.slf4j.Logger;
@@ -28,11 +27,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.Collections;
import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM;
@@ -52,57 +48,24 @@ public class FileScanner {
this.fileName = fileName;
this.dataTime = dataTime;
}
-
}
private static final Logger logger =
LoggerFactory.getLogger(FileScanner.class);
- public static List<String> getDataTimeList(long startTime, long endTime,
String cycleUnit, String timeOffset,
- boolean isRetry) {
- if (!isRetry) {
- startTime += DateTransUtils.calcOffset(timeOffset);
- endTime += DateTransUtils.calcOffset(timeOffset);
- }
- List<String> dataTimeList = new ArrayList<>();
- List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime,
cycleUnit);
- for (Long time : dateRegion) {
- String dataTime = DateTransUtils.millSecConvertToTimeStr(time,
cycleUnit);
- dataTimeList.add(dataTime);
- }
- return dataTimeList;
- }
-
public static List<BasicFileInfo> scanTaskBetweenTimes(String
originPattern, String cycleUnit, String timeOffset,
long startTime, long endTime, boolean isRetry) {
- if (!isRetry) {
- startTime += DateTransUtils.calcOffset(timeOffset);
- endTime += DateTransUtils.calcOffset(timeOffset);
- }
- String strStartTime =
DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit);
- String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime,
cycleUnit);
- logger.info("{} scan time is between {} and {}",
- new Object[]{originPattern, strStartTime, strEndTime});
-
- return scanTaskBetweenTimes(cycleUnit, originPattern, startTime,
endTime);
- }
-
- /* Scan log files and create tasks between two times. */
- public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit,
String originPattern, long startTime,
- long endTime) {
- List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime,
cycleUnit);
- List<BasicFileInfo> infos = new ArrayList<BasicFileInfo>();
- for (Long time : dateRegion) {
- Calendar calendar = Calendar.getInstance();
- calendar.setTimeInMillis(time);
- String fileName = NewDateUtils.replaceDateExpression(calendar,
originPattern);
- ArrayList<String> allPaths =
FilePathUtil.cutDirectoryByWildcard(fileName);
+ List<BasicFileInfo> infos = new ArrayList<>();
+ List<FinalPatternInfo> finalPatternInfos =
Scanner.getFinalPatternInfos(originPattern, cycleUnit, timeOffset,
+ startTime, endTime, isRetry);
+ for (FinalPatternInfo finalPatternInfo : finalPatternInfos) {
+ ArrayList<String> allPaths =
PatternUtil.cutDirectoryByWildcard(finalPatternInfo.finalPattern);
String firstDir = allPaths.get(0);
String secondDir = allPaths.get(0) + File.separator +
allPaths.get(1);
- ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir,
secondDir, fileName, 3,
+ ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir,
secondDir, finalPatternInfo.finalPattern, 3,
DEFAULT_FILE_MAX_NUM);
for (String file : fileList) {
// TODO the time is not YYYYMMDDHH
- String dataTime = DateTransUtils.millSecConvertToTimeStr(time,
cycleUnit);
+ String dataTime =
DateTransUtils.millSecConvertToTimeStr(finalPatternInfo.dataTime, cycleUnit);
BasicFileInfo info = new BasicFileInfo(file, dataTime);
logger.info("scan new task fileName {} ,dataTime {}", file,
dataTime);
infos.add(info);
@@ -134,34 +97,4 @@ public class FileScanner {
}
return ret;
}
-
- @SuppressWarnings("unused")
- private static ArrayList<String> getUpdatedOrNewFiles(String logFileName,
- int maxFileNum) {
- ArrayList<String> ret = new ArrayList<String>();
- ArrayList<String> directories = FilePathUtil
- .cutDirectoryByWildcardAndDateExpression(logFileName);
- String parentDir = directories.get(0) + File.separator
- + directories.get(1);
-
- Pattern pattern = Pattern.compile(directories.get(2),
- Pattern.CASE_INSENSITIVE);
- for (File file : new File(parentDir).listFiles()) {
- Matcher matcher = pattern.matcher(file.getName());
- if (matcher.matches() && ret.size() < maxFileNum) {
- ret.add(file.getAbsolutePath());
- }
- }
- return ret;
- }
-
- public static void main(String[] args) {
-
- ArrayList<String> fileList = FileScanner.getUpdatedOrNewFiles(
- "f:\\\\abc", "f:\\\\abc\\\\", "f:\\\\abc\\\\1.txt", 3, 100);
- // fileList = FileScanner.getUpdatedOrNewFiles("F:\\abc\\1.txt", 100);
- for (String fileName : fileList) {
- System.out.println(fileName);
- }
- }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileTimeComparator.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
similarity index 95%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileTimeComparator.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
index 949044d864..1fbbde3b56 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileTimeComparator.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.task.file;
import java.io.File;
import java.util.Comparator;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/Files.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
similarity index 97%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/Files.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
index b4ddcfac53..3c3bca9d59 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/Files.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.task.file;
import org.apache.inlong.agent.utils.file.FileFinder;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index 4f49cfdd7d..0c104956d7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -24,9 +24,10 @@ import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.core.task.TaskAction;
import org.apache.inlong.agent.plugin.task.AbstractTask;
import org.apache.inlong.agent.plugin.task.file.FileScanner.BasicFileInfo;
-import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
-import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
-import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
+import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression;
+import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
@@ -45,8 +46,10 @@ import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@@ -62,13 +65,12 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
- * Watch directory, if new valid files are created, create jobs
correspondingly.
+ * Watch directory, if new valid files are created, create instance
correspondingly.
*/
public class LogFileTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(LogFileTask.class);
public static final String DEFAULT_FILE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.FileInstance";
- public static final String SCAN_CYCLE_RANCE = "-2";
private static final int INSTANCE_QUEUE_CAPACITY = 10;
private final Map<String, WatchEntity> watchers = new
ConcurrentHashMap<>();
private final Set<String> watchFailedDirs = new HashSet<>();
@@ -77,8 +79,8 @@ public class LogFileTask extends AbstractTask {
public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
public static final int CORE_THREAD_MAX_GAP_TIME_MS = 60 * 1000;
private boolean retry;
- private long startTime;
- private long endTime;
+ private volatile long startTime;
+ private volatile long endTime;
private boolean realTime = false;
private Set<String> originPatterns;
private long lastScanTime = 0;
@@ -95,19 +97,32 @@ public class LogFileTask extends AbstractTask {
@Override
protected void initTask() {
instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
- retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
+ retry = taskProfile.getBoolean(TaskConstants.FILE_TASK_RETRY, false);
originPatterns =
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
.collect(Collectors.toSet());
if
(taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
realTime = true;
}
if (retry) {
- retryInit();
+ initRetryTask(taskProfile);
} else {
watchInit();
}
}
+ private boolean initRetryTask(TaskProfile profile) {
+ String dataTimeFrom = profile.get(TaskConstants.FILE_TASK_TIME_FROM,
"");
+ String dataTimeTo = profile.get(TaskConstants.FILE_TASK_TIME_TO, "");
+ try {
+ startTime = DateTransUtils.timeStrConvertToMillSec(dataTimeFrom,
profile.getCycleUnit());
+ endTime = DateTransUtils.timeStrConvertToMillSec(dataTimeTo,
profile.getCycleUnit());
+ } catch (ParseException e) {
+ LOGGER.error("retry task time error start {} end {}",
dataTimeFrom, dataTimeTo, e);
+ return false;
+ }
+ return true;
+ }
+
@Override
protected List<InstanceProfile> getNewInstanceList() {
if (retry) {
@@ -119,6 +134,7 @@ public class LogFileTask extends AbstractTask {
while (list.size() < INSTANCE_QUEUE_CAPACITY &&
!instanceQueue.isEmpty()) {
InstanceProfile profile = instanceQueue.poll();
if (profile != null) {
+ LOGGER.info("test123 2 taskid {} {}", getTaskId(),
profile.getInstanceId());
list.add(profile);
}
}
@@ -159,22 +175,14 @@ public class LogFileTask extends AbstractTask {
LOGGER.error("task profile needs time offset");
return false;
}
- if (profile.getBoolean(TaskConstants.TASK_RETRY, false)) {
- long startTime = profile.getLong(TaskConstants.TASK_START_TIME, 0);
- long endTime = profile.getLong(TaskConstants.TASK_END_TIME, 0);
- if (startTime == 0 || endTime == 0) {
- LOGGER.error("retry task time error start {} end {}",
startTime, endTime);
+ if (profile.getBoolean(TaskConstants.FILE_TASK_RETRY, false)) {
+ if (!initRetryTask(profile)) {
return false;
}
}
return true;
}
- private void retryInit() {
- startTime = taskProfile.getLong(TaskConstants.TASK_START_TIME, 0);
- endTime = taskProfile.getLong(TaskConstants.TASK_END_TIME, 0);
- }
-
private void watchInit() {
originPatterns.forEach((pathPattern) -> {
addPathPattern(pathPattern);
@@ -182,12 +190,12 @@ public class LogFileTask extends AbstractTask {
}
public void addPathPattern(String originPattern) {
- ArrayList<String> directories =
FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
+ ArrayList<String> directories =
PatternUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
String basicStaticPath = directories.get(0);
LOGGER.info("dataName {} watchPath {}", new Object[]{originPattern,
basicStaticPath});
/* Remember the failed watcher creations. */
if (!new File(basicStaticPath).exists()) {
- LOGGER.warn(AgentErrMsg.DIRECTORY_NOT_FOUND_ERROR +
basicStaticPath);
+ LOGGER.warn("DIRECTORY_NOT_FOUND_ERROR" + basicStaticPath);
watchFailedDirs.add(originPattern);
return;
}
@@ -204,9 +212,9 @@ public class LogFileTask extends AbstractTask {
watchFailedDirs.remove(originPattern);
} catch (IOException e) {
if (e.toString().contains("Too many open files") ||
e.toString().contains("打开的文件过多")) {
- LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString());
+ LOGGER.error("WATCH_DIR_ERROR", e);
} else {
- LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString(), e);
+ LOGGER.error("WATCH_DIR_ERROR", e);
}
} catch (Exception e) {
LOGGER.error("addPathPattern:", e);
@@ -283,21 +291,12 @@ public class LogFileTask extends AbstractTask {
}
private List<BasicFileInfo> scanExistingFileByPattern(String
originPattern) {
- long startScanTime = startTime;
- long endScanTime = endTime;
- if (!retry) {
- long currentTime = System.currentTimeMillis();
- // only scan two cycle, like two hours or two days
- long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE +
taskProfile.getCycleUnit());
- startScanTime = currentTime + offset;
- endScanTime = currentTime;
- }
if (realTime) {
return FileScanner.scanTaskBetweenTimes(originPattern,
CycleUnitType.HOUR, taskProfile.getTimeOffset(),
- startScanTime, endScanTime, retry);
+ startTime, endTime, retry);
} else {
return FileScanner.scanTaskBetweenTimes(originPattern,
taskProfile.getCycleUnit(),
- taskProfile.getTimeOffset(), startScanTime, endScanTime,
retry);
+ taskProfile.getTimeOffset(), startTime, endTime, retry);
}
}
@@ -328,14 +327,7 @@ public class LogFileTask extends AbstractTask {
private void dealWithEventMapWithCycle() {
long startScanTime = startTime;
long endScanTime = endTime;
- if (!retry) {
- long currentTime = System.currentTimeMillis();
- // only scan two cycle, like two hours or two days
- long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE +
taskProfile.getCycleUnit());
- startScanTime = currentTime + offset;
- endScanTime = currentTime;
- }
- List<String> dataTimeList = FileScanner.getDataTimeList(startScanTime,
endScanTime, taskProfile.getCycleUnit(),
+ List<String> dataTimeList = Scanner.getDataTimeList(startScanTime,
endScanTime, taskProfile.getCycleUnit(),
taskProfile.getTimeOffset(), retry);
if (dataTimeList.isEmpty()) {
LOGGER.error("getDataTimeList get empty list");
@@ -345,12 +337,16 @@ public class LogFileTask extends AbstractTask {
// normal task first handle current data time
if (!retry) {
String current = dataTimeList.remove(dataTimeList.size() - 1);
- dealEventMapByDataTime(current, true);
dealtDataTime.add(current);
+ if (!dealEventMapByDataTime(current, true)) {
+ return;
+ }
}
dataTimeList.forEach(dataTime -> {
dealtDataTime.add(dataTime);
- dealEventMapByDataTime(dataTime, false);
+ if (!dealEventMapByDataTime(dataTime, false)) {
+ return;
+ }
});
for (String dataTime : eventMap.keySet()) {
if (!dealtDataTime.contains(dataTime)) {
@@ -365,27 +361,27 @@ public class LogFileTask extends AbstractTask {
}
}
- private void dealEventMapByDataTime(String dataTime, boolean
isCurrentDataTime) {
+ private boolean dealEventMapByDataTime(String dataTime, boolean
isCurrentDataTime) {
Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.get(dataTime);
if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) {
- return;
+ return true;
}
if (realTime || shouldStartNow(dataTime)) {
- /* These codes will sort the FileCreationEvents by create time. */
- Set<InstanceProfile> sortedEvents = new
TreeSet<>(sameDataTimeEvents.values());
- /* Check the file end with event creation time in asc order. */
+ Set<InstanceProfile> sortedEvents = new
TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId));
+ sortedEvents.addAll(sameDataTimeEvents.values());
for (InstanceProfile sortEvent : sortedEvents) {
String fileName = sortEvent.getInstanceId();
InstanceProfile profile = sameDataTimeEvents.get(fileName);
if (!isCurrentDataTime && isFull()) {
- return;
+ return false;
}
if (!instanceQueue.offer(profile)) {
- return;
+ return false;
}
sameDataTimeEvents.remove(fileName);
}
}
+ return true;
}
/*
@@ -484,14 +480,14 @@ public class LogFileTask extends AbstractTask {
private void handleFilePath(Path filePath, WatchEntity entity) {
String newFileName = filePath.toFile().getAbsolutePath();
- LOGGER.info("new file {} {}", newFileName, entity.getOriginPattern());
+ LOGGER.info("new file {} {}", newFileName, entity.getPattern());
Matcher matcher = entity.getPattern().matcher(newFileName);
if (matcher.matches() || matcher.lookingAt()) {
- LOGGER.info("matched file {} {}", newFileName,
entity.getOriginPattern());
+ LOGGER.info("matched file {} {}", newFileName,
entity.getPattern());
String dataTime = getDataTimeFromFileName(newFileName,
entity.getOriginPattern(),
entity.getDateExpression());
if (!checkFileNameForTime(newFileName, entity)) {
- LOGGER.error(AgentErrMsg.FILE_ERROR + "File Timeout {} {}",
newFileName, dataTime);
+ LOGGER.error("File Timeout {} {}", newFileName, dataTime);
return;
}
addToEvenMap(newFileName, dataTime);
@@ -566,8 +562,7 @@ public class LogFileTask extends AbstractTask {
* Register a new watch service on the path if the old watcher is
invalid.
*/
if (!key.isValid()) {
- LOGGER.warn(AgentErrMsg.WATCHER_INVALID + "Invalid Watcher {}",
- contextPath.getFileName());
+ LOGGER.warn("Invalid Watcher {}", contextPath.getFileName());
try {
WatchService oldService = entity.getWatchService();
oldService.close();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
index af6d018a1d..b33eaff818 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
@@ -17,11 +17,11 @@
package org.apache.inlong.agent.plugin.task.file;
-import org.apache.inlong.agent.plugin.utils.file.DateUtils;
-import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
-import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
-import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition;
-import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
+import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.NonRegexPatternPosition;
+import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression;
+import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
@@ -69,11 +69,11 @@ public class WatchEntity {
String cycleUnit) {
this.watchService = watchService;
this.originPattern = originPattern;
- ArrayList<String> directoryLayers =
FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
+ ArrayList<String> directoryLayers =
PatternUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
this.basicStaticPath = directoryLayers.get(0);
this.regexPattern =
NewDateUtils.replaceDateExpressionWithRegex(originPattern);
pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE |
Pattern.DOTALL | Pattern.MULTILINE);
- ArrayList<String> directories =
FilePathUtil.cutDirectoryByWildcard(originPattern);
+ ArrayList<String> directories =
PatternUtil.cutDirectoryByWildcard(originPattern);
this.originPatternWithoutFileName = directories.get(0);
this.patternWithoutFileName = Pattern
.compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
@@ -91,6 +91,7 @@ public class WatchEntity {
@Override
public String toString() {
return "WatchEntity [parentPathName=" + basicStaticPath
+ + ", pattern=" + pattern
+ ", readFilePattern=" + regexPattern
+ ", dateExpression=" + dateExpression + ",
originPatternWithoutFileName="
+ originPatternWithoutFileName + ", containRegexPattern="
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
similarity index 58%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
index ae57acbca8..2366dbae06 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
import hirondelle.date4j.DateTime;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
import java.util.Objects;
import java.util.TimeZone;
import java.util.regex.Matcher;
@@ -42,16 +39,6 @@ public class DateUtils {
Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
private String dateFormat = "YYYYMMDDhhmmss";
- public DateUtils() {
-
- }
-
- public DateUtils(String timeFormat) {
- if (timeFormat != null && !timeFormat.isEmpty()) {
- dateFormat = timeFormat;
- }
- }
-
public static String getSubTimeFormat(String format, int length) {
// format may be "YYYYMMDDhhmmss" | "YYYY_MM_DD_hh_mm_ss"
int formatLen = format.length();
@@ -138,35 +125,6 @@ public class DateUtils {
: new PathDateExpression(longestPattern, position));
}
- public static String formatTime(long time) {
- SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
- df.setTimeZone(TimeZone.getTimeZone("GMT+8:00"));
- return df.format(new Date(time));
- }
-
- public static boolean compare(String time, int offset)
- throws ParseException {
- long value = 1000 * 60 * 60 * 24;
- SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
- long to = System.currentTimeMillis();
- long from = df.parse(time.substring(0, 8)).getTime();
- if ((to - from) / value > offset) {
- return true;
- } else {
- return false;
- }
- }
-
- public static boolean compare(long time, int offset) {
- long value = 1000 * 60 * 60 * 24;
- long to = System.currentTimeMillis();
- if ((to - time) / value > offset) {
- return true;
- } else {
- return false;
- }
- }
-
public void init(String timeFormat) {
if (timeFormat != null && !timeFormat.isEmpty()) {
dateFormat = timeFormat;
@@ -221,38 +179,6 @@ public class DateUtils {
return sb.toString();
}
- public String getFormatSpecifiedTime(String specifiedTime) {
- if (specifiedTime == null || specifiedTime.length() == 0) {
- return specifiedTime;
- }
-
- int formatLen = dateFormat.length();
-
- if (specifiedTime.length() == formatLen
- && !specifiedTime.matches(DIGIT_STR)) {
- return specifiedTime;
- }
-
- StringBuilder retSb = new StringBuilder();
- int specifiedInx = 0;
- for (int i = 0; i < formatLen; i++) {
- char tmpChar = dateFormat.charAt(i);
-
- if (tmpChar != 'Y' && tmpChar != 'M' && tmpChar != 'D'
- && tmpChar != 'h' && tmpChar != 'm') {
- retSb.append(tmpChar);
- } else {
- retSb.append(specifiedTime.charAt(specifiedInx++));
- }
- }
-
- logger.info(
- "TimeRegex {} <> specifiedTime {} not match, format
specifiedTime {}",
- new Object[]{dateFormat, specifiedTime, retSb.toString()});
-
- return retSb.toString();
- }
-
public String getDate(String src, String limit) {
if (src == null || src.trim().isEmpty()) {
return "";
@@ -333,123 +259,4 @@ public class DateUtils {
}
return dt.format(outputFormat);
}
-
- public String getAttrPunit(String attrs) {
- String punit = null;
- if (attrs != null && attrs.contains("&p=")) {
- for (String attr : attrs.split("&")) {
- if (attr.startsWith("p=") && attr.split("=").length == 2) {
- punit = attr.split("=")[1];
- break;
- }
- }
- }
-
- return punit;
- }
-
- public String getSpecifiedDate(String src, String limit, String punit) {
- String ret = getDate(src, limit);
- return formatCurrPeriod(ret, punit);
- }
-
- public String normalizeTimeRegex(String src) {
- return getSubTimeFormat(dateFormat, src.length());
- }
-
- public String getCurrentDir(String src, String timeOffset) {
- Matcher m = pattern.matcher(src);
- StringBuffer sb = new StringBuffer();
- while (m.find()) {
- String oneMatch = m.group(0);
- String currTimeStr = getDate(oneMatch, timeOffset);
- m.appendReplacement(sb, currTimeStr);
- }
- m.appendTail(sb);
- return sb.toString();
- }
-
- public String getCurrentDirByPunit(String src, String timeOffset,
- String punit) {
- Matcher m = pattern.matcher(src);
- StringBuffer sb = new StringBuffer();
- while (m.find()) {
- String oneMatch = m.group(0);
- String currTimeStr = getSpecifiedDate(oneMatch, timeOffset, punit);
- m.appendReplacement(sb, currTimeStr);
- }
- m.appendTail(sb);
-
- return sb.toString();
- }
-
- public String getSpecifiedDir(String src, String specifiedDate) {
- Matcher m = pattern.matcher(src);
- StringBuffer sb = new StringBuffer();
-
- while (m.find()) {
- String oneMatch = m.group(0);
- StringBuilder tmpSb = new StringBuilder();
- int specifiedDateIdx = 0;
-
- for (int i = 0; i < oneMatch.length(); i++) {
- char matchChar = oneMatch.charAt(i);
- if (matchChar != 'Y' && matchChar != 'M' && matchChar != 'D'
- && matchChar != 'h' && matchChar != 'm') {
- tmpSb.append(matchChar);
- } else {
- char dateChar = specifiedDate.charAt(specifiedDateIdx);
- while (String.valueOf(dateChar).matches("\\D")) {
- dateChar = specifiedDate.charAt(++specifiedDateIdx);
- }
- tmpSb.append(dateChar);
- specifiedDateIdx++;
- }
- }
- m.appendReplacement(sb, tmpSb.toString());
- }
- m.appendTail(sb);
- return sb.toString();
- }
-
- // format current period starting less-than-hour task
- // * for example: ten-minute task:
- // * currPeriodDataTime is 201303271905
- // * formated value is 201303271900
- public String formatCurrPeriod(String src, String punit) {
- if (src == null || punit == null || src.length() != 12) {
- return src;
- }
-
- String prefixMinuteStr = src.substring(0, src.length() - 2);
- String minuteStr = src.substring(src.length() - 2, src.length());
-
- if ("n".equals(punit)) {
- if (minuteStr.compareTo("30") < 0) {
- minuteStr = "00";
- } else {
- minuteStr = "30";
- }
- } else if ("q".equals(punit)) {
- if (minuteStr.compareTo("15") < 0) {
- minuteStr = "00";
- } else if (minuteStr.compareTo("30") < 0) {
- minuteStr = "15";
- } else if (minuteStr.compareTo("45") < 0) {
- minuteStr = "30";
- } else {
- minuteStr = "45";
- }
- } else if ("t".equals(punit)) {
- minuteStr = minuteStr.charAt(0) + "0";
- } else if ("f".equals(punit)) {
- if (minuteStr.substring(1).compareTo("5") < 0) {
- minuteStr = minuteStr.charAt(0) + "0";
- } else {
- minuteStr = minuteStr.charAt(0) + "5";
- }
- }
-
- return prefixMinuteStr + minuteStr;
- }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/MatchPoint.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/MatchPoint.java
similarity index 96%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/MatchPoint.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/MatchPoint.java
index b7222a191a..ec1167d06e 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/MatchPoint.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/MatchPoint.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
public class MatchPoint {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
similarity index 92%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
index 563a168885..a80f088e0d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.utils.DateTransUtils;
@@ -109,45 +109,6 @@ public class NewDateUtils {
.format(new Date(getDateTime(calendar, cycleUnit,
offset).getTimeInMillis()));
}
- private static Calendar getCurDate(String cycleUnit, String offset) {
- if (cycleUnit == null || cycleUnit.length() == 0) {
- return null;
- }
-
- Calendar calendar = Calendar.getInstance();
- calendar.setTimeInMillis(System.currentTimeMillis());
-
- return getDateTime(calendar, cycleUnit, offset);
- }
-
- public static String getDateTime(String dataTime, String cycleUnit, String
offset) {
- String retTime = DateTransUtils.millSecConvertToTimeStr(
- System.currentTimeMillis(), cycleUnit);
- try {
- long time = DateTransUtils.timeStrConvertToMillSec(dataTime,
cycleUnit);
-
- Calendar calendar = Calendar.getInstance();
- calendar.setTimeInMillis(time);
- Calendar retCalendar = getDateTime(calendar, cycleUnit, offset);
- if (retCalendar == null) {
- return dataTime;
- }
-
- retTime =
DateTransUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(),
- cycleUnit);
- } catch (Exception e) {
- logger.error("getDateTime error: ", e);
- }
- return retTime;
- }
-
- public static String getDateTime(long time, String cycleUnit, String
offset) {
- Calendar calendar = Calendar.getInstance();
- calendar.setTimeInMillis(time);
- Calendar retCalendar = getDateTime(calendar, cycleUnit, offset);
- return
DateTransUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(),
cycleUnit);
- }
-
private static Calendar getDateTime(Calendar calendar, String cycleUnit,
String offset) {
int cycleNumber = (cycleUnit.length() <= 1
? 1
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NonRegexPatternPosition.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NonRegexPatternPosition.java
similarity index 97%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NonRegexPatternPosition.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NonRegexPatternPosition.java
index 0732d184e0..a12140a314 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NonRegexPatternPosition.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NonRegexPatternPosition.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
/*
* Describe the nearest character around the date time expression. For
example, for date source name
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/PathDateExpression.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PathDateExpression.java
similarity index 96%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/PathDateExpression.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PathDateExpression.java
index c75e0398b8..d73329424f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/PathDateExpression.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PathDateExpression.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
/* The date expression in the file path. */
public class PathDateExpression {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PatternUtil.java
similarity index 91%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PatternUtil.java
index 912b035642..dc27addb07 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PatternUtil.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
import org.apache.commons.lang.StringUtils;
import java.io.File;
import java.util.ArrayList;
-public class FilePathUtil {
+public class PatternUtil {
private static final String YEAR = "YYYY";
private static final String MONTH = "MM";
@@ -158,9 +158,18 @@ public class FilePathUtil {
}
public static boolean isSameDir(String fileName1, String fileName2) {
- ArrayList<String> ret1 =
FilePathUtil.cutDirectoryByWildcard(fileName1);
- ArrayList<String> ret2 =
FilePathUtil.cutDirectoryByWildcard(fileName2);
+ ArrayList<String> ret1 = PatternUtil.cutDirectoryByWildcard(fileName1);
+ ArrayList<String> ret2 = PatternUtil.cutDirectoryByWildcard(fileName2);
return ret1.get(0).equals(ret2.get(0));
}
+ public static String getBeforeFirstWildcard(String input) {
+ String sign = "\\^$*+?{(|[.";
+ int firstWildcardIndex = StringUtils.indexOfAny(input, sign);
+ if (firstWildcardIndex != -1) {
+ return input.substring(0, firstWildcardIndex);
+ } else {
+ return "";
+ }
+ }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
new file mode 100644
index 0000000000..b3153e81fe
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils.regex;
+
+import org.apache.inlong.agent.utils.DateTransUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+public class Scanner {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Scanner.class);
+ public static final String SCAN_CYCLE_RANCE = "-2";
+
+ public static class TimeRange {
+
+ public Long startTime;
+ public Long endTime;
+
+ public TimeRange(Long startTime, Long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+ }
+
+ public static class FinalPatternInfo {
+
+ public String finalPattern;
+ public Long dataTime;
+
+ public FinalPatternInfo(String finalPattern, Long dataTime) {
+ this.finalPattern = finalPattern;
+ this.dataTime = dataTime;
+ }
+ }
+
+ public static List<FinalPatternInfo> getFinalPatternInfos(String
originPattern, String cycleUnit, String timeOffset,
+ long startTime, long endTime, boolean isRetry) {
+ TimeRange range = Scanner.getTimeRange(startTime, endTime, cycleUnit,
timeOffset, isRetry);
+ String strStartTime =
DateTransUtils.millSecConvertToTimeStr(range.startTime, cycleUnit);
+ String strEndTime =
DateTransUtils.millSecConvertToTimeStr(range.endTime, cycleUnit);
+ LOGGER.info("{} scan time is between {} and {}", originPattern,
strStartTime, strEndTime);
+ List<Long> dateRegion = NewDateUtils.getDateRegion(range.startTime,
range.endTime, cycleUnit);
+ List<FinalPatternInfo> finalPatternList = new ArrayList<>();
+ for (Long time : dateRegion) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeInMillis(time);
+ FinalPatternInfo finalPatternInfo = new FinalPatternInfo(
+ NewDateUtils.replaceDateExpression(calendar,
originPattern), time);
+ finalPatternList.add(finalPatternInfo);
+ }
+ return finalPatternList;
+ }
+
+ public static List<String> getDataTimeList(long startTime, long endTime,
String cycleUnit, String timeOffset,
+ boolean isRetry) {
+ TimeRange range = getTimeRange(startTime, endTime, cycleUnit,
timeOffset, isRetry);
+ List<String> dataTimeList = new ArrayList<>();
+ List<Long> dateRegion = NewDateUtils.getDateRegion(range.startTime,
range.endTime, cycleUnit);
+ for (Long time : dateRegion) {
+ String dataTime = DateTransUtils.millSecConvertToTimeStr(time,
cycleUnit);
+ dataTimeList.add(dataTime);
+ }
+ return dataTimeList;
+ }
+
+ public static TimeRange getTimeRange(long startTime, long endTime, String
cycleUnit, String timeOffset,
+ boolean isRetry) {
+ if (!isRetry) {
+ long currentTime = System.currentTimeMillis();
+ // only scan two cycle, like two hours or two days
+ long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE +
cycleUnit);
+ startTime = currentTime + offset +
DateTransUtils.calcOffset(timeOffset);
+ endTime = currentTime + DateTransUtils.calcOffset(timeOffset);
+ }
+ return new TimeRange(startTime, endTime);
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 7730bb29c8..3dc4f8ab15 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -83,7 +83,7 @@ public class AgentBaseTestsHelper {
}
public TaskProfile getTaskProfile(int taskId, String pattern, String
dataContentStyle, boolean retry,
- Long startTime, Long endTime,
+ String startTime, String endTime,
TaskStateEnum state, String cycleUnit, String timeZone,
List<String> filterStreams) {
DataConfig dataConfig = getDataConfig(taskId, pattern,
dataContentStyle, retry, startTime, endTime,
state, cycleUnit, timeZone,
@@ -92,9 +92,9 @@ public class AgentBaseTestsHelper {
return profile;
}
- private DataConfig getDataConfig(int taskId, String pattern, String
dataContentStyle, boolean retry, Long startTime,
- Long endTime,
- TaskStateEnum state, String cycleUnit, String timeZone,
List<String> filterStreams) {
+ private DataConfig getDataConfig(int taskId, String pattern, String
dataContentStyle, boolean retry,
+ String startTime, String endTime, TaskStateEnum state, String
cycleUnit, String timeZone,
+ List<String> filterStreams) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
dataConfig.setInlongStreamId("testStreamId");
@@ -110,8 +110,8 @@ public class AgentBaseTestsHelper {
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit(cycleUnit);
fileTaskConfig.setRetry(retry);
- fileTaskConfig.setStartTime(startTime);
- fileTaskConfig.setEndTime(endTime);
+ fileTaskConfig.setDataTimeFrom(startTime);
+ fileTaskConfig.setDataTimeTo(endTime);
// mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf
fileTaskConfig.setDataContentStyle(dataContentStyle);
fileTaskConfig.setDataSeparator("|");
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
index f2c5f25ee3..61c94c4dd1 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
@@ -59,7 +59,7 @@ public class TestInstanceManager {
helper = new
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt";
Store basicInstanceStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
- taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L,
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
+ taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
"GMT+6:00", null);
Store taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
TaskStore taskStore = new TaskStore(taskBasicStore);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
index 5524c5e96e..066776d32f 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -47,7 +47,7 @@ public class KafkaSinkTest {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+ TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, "", "", TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
index 93702fad16..d7733259ab 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -47,7 +47,7 @@ public class PulsarSinkTest {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+ TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, "", "", TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index afeb3565e2..5a1168edef 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -24,7 +24,7 @@ import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.message.file.OffsetAckInfo;
import org.apache.inlong.agent.message.file.SenderMessage;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
@@ -70,7 +70,7 @@ public class TestSenderManager {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+ TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, "", "", TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index df70039459..6ee892c914 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -27,7 +27,7 @@ import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import org.apache.inlong.agent.store.Store;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
@@ -77,7 +77,7 @@ public class TestLogFileSource {
private LogFileSource getSource(int taskId, long offset) {
try {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
"csv", false, 0L, 0L,
+ TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
"csv", false, "", "",
TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
String fileName =
LOADER.getResource("test/20230928_1.txt").getPath();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
index 2680c01e06..4f2e90870b 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
@@ -121,7 +121,7 @@ public class TestRedisSource {
final String command = "zscore";
final String subOperation = "set,del";
- TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false,
0L, 0L, TaskStateEnum.RUNNING, "D",
+ TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false,
"", "", TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
profile = taskProfile.createInstanceProfile("",
"", taskProfile.getCycleUnit(), "20240725",
AgentUtils.getCurrentTime());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 2a90bdc37a..377e5ae913 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -136,7 +136,7 @@ public class TestSQLServerSource {
final String tableName = "test_source";
final String serverName = "server-01";
- TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false,
0L, 0L, TaskStateEnum.RUNNING, "D",
+ TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false,
"", "", TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
instanceProfile = taskProfile.createInstanceProfile("",
"", taskProfile.getCycleUnit(), "20240725",
AgentUtils.getCurrentTime());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index 1ef3b5db1e..440c4a5208 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -40,10 +40,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -83,17 +81,17 @@ public class TestLogFileTask {
public void testScan() throws Exception {
doTest(1, Arrays.asList("testScan/20230928_1/test_1.txt"),
resourceParentPath + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt",
CycleUnitType.DAY, Arrays.asList("20230928"),
- "2023-09-28 00:00:00", "2023-09-30 23:00:00");
+ "20230928", "20230930");
doTest(2, Arrays.asList("testScan/2023092810_1/test_1.txt"),
resourceParentPath + "/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt",
- CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023-09-28
00:00:00", "2023-09-30 23:00:00");
+ CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023092800",
"2023093023");
doTest(3, Arrays.asList("testScan/202309281030_1/test_1.txt",
"testScan/202309301059_1/test_1.txt"),
resourceParentPath + "/YYYYMMDDhhmm_[0-9]+/test_[0-9]+.txt",
- CycleUnitType.MINUTE, Arrays.asList("202309281030",
"202309301059"), "2023-09-28 00:00:00",
- "2023-09-30 23:00:00");
+ CycleUnitType.MINUTE, Arrays.asList("202309281030",
"202309301059"), "202309280000",
+ "202309302300");
doTest(4, Arrays.asList("testScan/20241030/23/59.txt"),
resourceParentPath + "/YYYYMMDD/hh/mm.txt",
- CycleUnitType.MINUTE, Arrays.asList("202410302359"),
"2024-10-30 00:00:00", "2024-10-31 00:00:00");
+ CycleUnitType.MINUTE, Arrays.asList("202410302359"),
"202410300000", "202410310000");
}
private void doTest(int taskId, List<String> resources, String pattern,
String cycle, List<String> srcDataTimes,
@@ -103,20 +101,14 @@ public class TestLogFileTask {
for (int i = 0; i < resources.size(); i++) {
resourceName.add(LOADER.getResource(resources.get(i)).getPath());
}
- TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
"csv", true, 0L, 0L, TaskStateEnum.RUNNING,
- cycle,
- "GMT+8:00", null);
+ TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
"csv", true, "", "", TaskStateEnum.RUNNING,
+ cycle, "GMT+8:00", null);
LogFileTask dayTask = null;
final List<String> fileName = new ArrayList();
final List<String> dataTime = new ArrayList();
try {
-
- Date parse = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").parse(startTime);
- long start = parse.getTime();
- parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endTime);
- long end = parse.getTime();
- taskProfile.setLong(TaskConstants.TASK_START_TIME, start);
- taskProfile.setLong(TaskConstants.TASK_END_TIME, end);
+ taskProfile.set(TaskConstants.FILE_TASK_TIME_FROM, startTime);
+ taskProfile.set(TaskConstants.FILE_TASK_TIME_TO, endTime);
dayTask = PowerMockito.spy(new LogFileTask());
PowerMockito.doAnswer(invocation -> {
fileName.add(invocation.getArgument(0));
@@ -128,7 +120,7 @@ public class TestLogFileTask {
dayTask.init(manager, taskProfile,
manager.getInstanceBasicStore());
EXECUTOR_SERVICE.submit(dayTask);
} catch (Exception e) {
- LOGGER.error("source init error {}", e);
+ LOGGER.error("source init error", e);
Assert.assertTrue("source init error", false);
}
await().atMost(10, TimeUnit.SECONDS)
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
index 608d3adec6..014c5ce4e2 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
@@ -58,7 +58,7 @@ public class TestTaskManager {
manager = new TaskManager();
TaskStore taskStore = manager.getTaskStore();
for (int i = 1; i <= 10; i++) {
- TaskProfile taskProfile = helper.getTaskProfile(i, pattern,
"csv", false, 0L, 0L, TaskStateEnum.RUNNING,
+ TaskProfile taskProfile = helper.getTaskProfile(i, pattern,
"csv", false, "", "", TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
taskProfile.setTaskClass(MockTask.class.getCanonicalName());
taskStore.storeTask(taskProfile);
@@ -74,7 +74,7 @@ public class TestTaskManager {
Assert.assertTrue("manager start error", false);
}
- TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv",
false, 0L, 0L, TaskStateEnum.RUNNING,
+ TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv",
false, "", "", TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
String taskId1 = taskProfile1.getTaskId();
taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
@@ -99,7 +99,7 @@ public class TestTaskManager {
Assert.assertTrue(manager.getTaskProfile(taskId1).getState() ==
TaskStateEnum.RUNNING);
// test delete
- TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv",
false, 0L, 0L, TaskStateEnum.RUNNING,
+ TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv",
false, "", "", TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles2 = new ArrayList<>();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index cf1b7c8f95..a1f13122e3 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -17,8 +17,8 @@
package org.apache.inlong.agent.plugin.utils;
-import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
-import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.metric.MetricRegister;
@@ -91,6 +91,8 @@ public class TestUtils {
Arrays.asList("/data/log_minute", "minute_YYYYMMDDhh*",
"mm.log_[0-9]+"));
testCutDirectoryByWildcard("/data/123+/YYYYMMDDhhmm.log",
Arrays.asList("/data", "123+", "YYYYMMDDhhmm.log"));
+ testCutDirectoryByWildcard("/data/2024112610*/test.log",
+ Arrays.asList("/data", "2024112610*", "test.log"));
/*
* 1 cut the file name 2 cut the path contains wildcard or date
expression
@@ -103,6 +105,20 @@ public class TestUtils {
Arrays.asList("/data", "123*/MMDD", "test.log"));
testCutDirectoryByWildcardAndDateExpression("/data/YYYYMMDD/123*/test.log",
Arrays.asList("/data", "YYYYMMDD/123*", "test.log"));
+
+ /*
+ * get the string before the first wildcard
+ */
+ testGetBeforeFirstWildcard("/data/YYYYMM/YYYaaMM/YYYYMMDDhhmm.log",
+ "/data/YYYYMM/YYYaaMM/YYYYMMDDhhmm");
+ testGetBeforeFirstWildcard("/data/123*/MMDD/test.log",
+ "/data/123");
+ testGetBeforeFirstWildcard("/data/YYYYMMDD/123*/test.log",
+ "/data/YYYYMMDD/123");
+ testGetBeforeFirstWildcard("test/65535_YYYYMMDD_hh00.log",
+ "test/65535_YYYYMMDD_hh00");
+ testGetBeforeFirstWildcard("/data/YYYYMMDD/*123/test.log",
+ "/data/YYYYMMDD/");
}
private void testReplaceDateExpression(String src, String dst) throws
ParseException {
@@ -113,12 +129,17 @@ public class TestUtils {
}
private void testCutDirectoryByWildcard(String src, List<String> dst) {
- ArrayList<String> directories =
FilePathUtil.cutDirectoryByWildcard(src);
+ ArrayList<String> directories =
PatternUtil.cutDirectoryByWildcard(src);
Assert.assertEquals(directories, dst);
}
+ private void testGetBeforeFirstWildcard(String src, String dst) {
+ String temp = PatternUtil.getBeforeFirstWildcard(src);
+ Assert.assertEquals(dst, temp);
+ }
+
private void testCutDirectoryByWildcardAndDateExpression(String src,
List<String> dst) {
- ArrayList<String> directoryLayers =
FilePathUtil.cutDirectoryByWildcardAndDateExpression(src);
+ ArrayList<String> directoryLayers =
PatternUtil.cutDirectoryByWildcardAndDateExpression(src);
Assert.assertEquals(directoryLayers, dst);
}