This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b91a38004f [INLONG-11624][Agent] Add SQL data source (#11625)
b91a38004f is described below
commit b91a38004fe1282ba897436e5696c41aa02621e7
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Dec 30 22:08:00 2024 +0800
[INLONG-11624][Agent] Add SQL data source (#11625)
* [INLONG-11624][Agent] Add SQL data source
* [INLONG-11624][Agent] Add code comments
* Update
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLInstance.java
Co-authored-by: AloysZhang <[email protected]>
* Update
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
Co-authored-by: AloysZhang <[email protected]>
* Update
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
Co-authored-by: AloysZhang <[email protected]>
* Update
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/FileTask.java
Co-authored-by: AloysZhang <[email protected]>
* Update
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/FileTask.java
Co-authored-by: AloysZhang <[email protected]>
* [INLONG-11624][Agent] Modify code based on comments
* Update
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
Co-authored-by: fuweng11 <[email protected]>
* [INLONG-11624][Agent] Modify thread name
* [INLONG-11624][Agent] Rename variables to prevent misunderstandings
---------
Co-authored-by: AloysZhang <[email protected]>
Co-authored-by: fuweng11 <[email protected]>
---
.../apache/inlong/agent/conf/InstanceProfile.java | 94 +++-
.../org/apache/inlong/agent/conf/TaskProfile.java | 49 ++-
.../inlong/agent/constant/TaskConstants.java | 19 +-
.../java/org/apache/inlong/agent/pojo/SQLTask.java | 62 +++
.../apache/inlong/agent/pojo/TaskProfileDto.java | 92 ++--
.../inlong/agent/plugin/instance/SQLInstance.java | 35 ++
.../inlong/agent/plugin/sources/COSSource.java | 2 +-
.../inlong/agent/plugin/sources/SQLSource.java | 291 ++++++++++++
.../plugin/task/logcollection/LogAbstractTask.java | 42 +-
.../agent/plugin/task/logcollection/SQLTask.java | 142 ++++++
.../plugin/task/logcollection/cos/COSTask.java | 50 +--
.../plugin/task/logcollection/local/FileTask.java | 68 +--
.../task/logcollection/local/WatchEntity.java | 7 +-
.../inlong/agent/plugin/utils/regex/DateUtils.java | 488 ++++++++++++++-------
.../agent/plugin/utils/regex/NewDateUtils.java | 466 --------------------
.../inlong/agent/plugin/utils/regex/Scanner.java | 6 +-
.../inlong/agent/plugin/AgentBaseTestsHelper.java | 33 ++
.../inlong/agent/plugin/task/TestCOSTask.java | 31 +-
.../{TestLogFileTask.java => TestFileTask.java} | 34 +-
.../inlong/agent/plugin/utils/TestUtils.java | 4 +-
20 files changed, 1166 insertions(+), 849 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index c3b20a0581..2c5eaeb371 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -48,16 +48,30 @@ import static
org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;
*/
public class InstanceProfile extends AbstractConfiguration implements
Comparable<InstanceProfile> {
- public static final String DEFAULT_FILE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.FileInstance";
- public static final String DEFAULT_COS_INSTANCE =
"org.apache.inlong.agent.plugin.instance.COSInstance";
- public static final String DEFAULT_KAFKA_INSTANCE =
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
- public static final String DEFAULT_MONGODB_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
- public static final String DEFAULT_MQTT_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MqttInstance";
- public static final String DEFAULT_ORACLE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.OracleInstance";
- public static final String DEFAULT_POSTGRES_INSTANCE =
"org.apache.inlong.agent.plugin.instance.PostgreSQLInstance";
- public static final String DEFAULT_PULSAR_INSTANCE =
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
- public static final String DEFAULT_REDIS_INSTANCE =
"org.apache.inlong.agent.plugin.instance.RedisInstance";
- public static final String DEFAULT_SQLSERVER_INSTANCE =
"org.apache.inlong.agent.plugin.instance.SQLServerInstance";
+ public static final String FILE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.FileInstance";
+ public static final String COS_INSTANCE =
"org.apache.inlong.agent.plugin.instance.COSInstance";
+ public static final String KAFKA_INSTANCE =
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
+ public static final String MONGODB_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
+ public static final String MQTT_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MqttInstance";
+ public static final String ORACLE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.OracleInstance";
+ public static final String POSTGRES_INSTANCE =
"org.apache.inlong.agent.plugin.instance.PostgreSQLInstance";
+ public static final String PULSAR_INSTANCE =
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
+ public static final String REDIS_INSTANCE =
"org.apache.inlong.agent.plugin.instance.RedisInstance";
+ public static final String SQLSERVER_INSTANCE =
"org.apache.inlong.agent.plugin.instance.SQLServerInstance";
+ public static final String SQL_INSTANCE =
"org.apache.inlong.agent.plugin.instance.SQLInstance";
+
+ public static final String FILE_SOURCE =
"org.apache.inlong.agent.plugin.sources.LogFileSource";
+ public static final String BINLOG_SOURCE =
"org.apache.inlong.agent.plugin.sources.BinlogSource";
+ public static final String KAFKA_SOURCE =
"org.apache.inlong.agent.plugin.sources.KafkaSource";
+ public static final String PULSAR_SOURCE =
"org.apache.inlong.agent.plugin.sources.PulsarSource";
+ public static final String POSTGRESQL_SOURCE =
"org.apache.inlong.agent.plugin.sources.PostgreSQLSource";
+ public static final String MONGO_SOURCE =
"org.apache.inlong.agent.plugin.sources.MongoDBSource";
+ public static final String ORACLE_SOURCE =
"org.apache.inlong.agent.plugin.sources.OracleSource";
+ public static final String REDIS_SOURCE =
"org.apache.inlong.agent.plugin.sources.RedisSource";
+ public static final String MQTT_SOURCE =
"org.apache.inlong.agent.plugin.sources.MqttSource";
+ public static final String SQLSERVER_SOURCE =
"org.apache.inlong.agent.plugin.sources.SQLServerSource";
+ public static final String COS_SOURCE =
"org.apache.inlong.agent.plugin.sources.COSSource";
+ public static final String SQL_SOURCE =
"org.apache.inlong.agent.plugin.sources.SQLSource";
private static final Logger LOGGER =
LoggerFactory.getLogger(InstanceProfile.class);
private static final Gson GSON = new Gson();
@@ -88,25 +102,27 @@ public class InstanceProfile extends AbstractConfiguration
implements Comparable
}
switch (taskType) {
case FILE:
- return DEFAULT_FILE_INSTANCE;
+ return FILE_INSTANCE;
case KAFKA:
- return DEFAULT_KAFKA_INSTANCE;
+ return KAFKA_INSTANCE;
case PULSAR:
- return DEFAULT_PULSAR_INSTANCE;
+ return PULSAR_INSTANCE;
case POSTGRES:
- return DEFAULT_POSTGRES_INSTANCE;
+ return POSTGRES_INSTANCE;
case ORACLE:
- return DEFAULT_ORACLE_INSTANCE;
+ return ORACLE_INSTANCE;
case SQLSERVER:
- return DEFAULT_SQLSERVER_INSTANCE;
+ return SQLSERVER_INSTANCE;
case MONGODB:
- return DEFAULT_MONGODB_INSTANCE;
+ return MONGODB_INSTANCE;
case REDIS:
- return DEFAULT_REDIS_INSTANCE;
+ return REDIS_INSTANCE;
case MQTT:
- return DEFAULT_MQTT_INSTANCE;
+ return MQTT_INSTANCE;
case COS:
- return DEFAULT_COS_INSTANCE;
+ return COS_INSTANCE;
+ case SQL:
+ return SQL_INSTANCE;
default:
LOGGER.error("invalid task type {}", taskType);
return null;
@@ -126,7 +142,43 @@ public class InstanceProfile extends AbstractConfiguration
implements Comparable
}
public String getSourceClass() {
- return get(TaskConstants.TASK_SOURCE);
+ TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE,
TaskTypeEnum.FILE.getType()));
+ return getSourceClassByTaskType(taskType);
+ }
+
+ public static String getSourceClassByTaskType(TaskTypeEnum taskType) {
+ if (taskType == null) {
+ return null;
+ }
+ switch (taskType) {
+ case BINLOG:
+ return BINLOG_SOURCE;
+ case FILE:
+ return FILE_SOURCE;
+ case KAFKA:
+ return KAFKA_SOURCE;
+ case PULSAR:
+ return PULSAR_SOURCE;
+ case POSTGRES:
+ return POSTGRESQL_SOURCE;
+ case ORACLE:
+ return ORACLE_SOURCE;
+ case SQLSERVER:
+ return SQLSERVER_SOURCE;
+ case MONGODB:
+ return MONGO_SOURCE;
+ case REDIS:
+ return REDIS_SOURCE;
+ case MQTT:
+ return MQTT_SOURCE;
+ case COS:
+ return COS_SOURCE;
+ case SQL:
+ return SQL_SOURCE;
+ default:
+ LOGGER.error("invalid task type {}", taskType);
+ return null;
+ }
}
public String getSinkClass() {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index c2a60a0598..06d7018a30 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -47,17 +47,18 @@ import static
org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;
*/
public class TaskProfile extends AbstractConfiguration {
- public static final String DEFAULT_FILE_TASK =
"org.apache.inlong.agent.plugin.task.logcollection.local.FileTask";
- public static final String DEFAULT_COS_TASK =
"org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask";
- public static final String DEFAULT_KAFKA_TASK =
"org.apache.inlong.agent.plugin.task.KafkaTask";
- public static final String DEFAULT_PULSAR_TASK =
"org.apache.inlong.agent.plugin.task.PulsarTask";
- public static final String DEFAULT_MONGODB_TASK =
"org.apache.inlong.agent.plugin.task.MongoDBTask";
- public static final String DEFAULT_ORACLE_TASK =
"org.apache.inlong.agent.plugin.task.OracleTask";
- public static final String DEFAULT_REDIS_TASK =
"org.apache.inlong.agent.plugin.task.RedisTask";
- public static final String DEFAULT_POSTGRESQL_TASK =
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
- public static final String DEFAULT_MQTT_TASK =
"org.apache.inlong.agent.plugin.task.MqttTask";
- public static final String DEFAULT_SQLSERVER_TASK =
"org.apache.inlong.agent.plugin.task.SQLServerTask";
- public static final String DEFAULT_MOCK_TASK =
"org.apache.inlong.agent.plugin.task.MockTask";
+ public static final String SQL_TASK =
"org.apache.inlong.agent.plugin.task.logcollection.SQLTask";
+ public static final String FILE_TASK =
"org.apache.inlong.agent.plugin.task.logcollection.local.FileTask";
+ public static final String COS_TASK =
"org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask";
+ public static final String KAFKA_TASK =
"org.apache.inlong.agent.plugin.task.KafkaTask";
+ public static final String PULSAR_TASK =
"org.apache.inlong.agent.plugin.task.PulsarTask";
+ public static final String MONGODB_TASK =
"org.apache.inlong.agent.plugin.task.MongoDBTask";
+ public static final String ORACLE_TASK =
"org.apache.inlong.agent.plugin.task.OracleTask";
+ public static final String REDIS_TASK =
"org.apache.inlong.agent.plugin.task.RedisTask";
+ public static final String POSTGRESQL_TASK =
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
+ public static final String MQTT_TASK =
"org.apache.inlong.agent.plugin.task.MqttTask";
+ public static final String SQLSERVER_TASK =
"org.apache.inlong.agent.plugin.task.SQLServerTask";
+ public static final String MOCK_TASK =
"org.apache.inlong.agent.plugin.task.MockTask";
private static final Gson GSON = new Gson();
private static final Logger logger =
LoggerFactory.getLogger(TaskProfile.class);
@@ -75,28 +76,30 @@ public class TaskProfile extends AbstractConfiguration {
public String getTaskClass() {
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE,
TaskTypeEnum.FILE.getType()));
switch (requireNonNull(taskType)) {
+ case SQL:
+ return SQL_TASK;
case FILE:
- return DEFAULT_FILE_TASK;
+ return FILE_TASK;
case KAFKA:
- return DEFAULT_KAFKA_TASK;
+ return KAFKA_TASK;
case PULSAR:
- return DEFAULT_PULSAR_TASK;
+ return PULSAR_TASK;
case POSTGRES:
- return DEFAULT_POSTGRESQL_TASK;
+ return POSTGRESQL_TASK;
case ORACLE:
- return DEFAULT_ORACLE_TASK;
+ return ORACLE_TASK;
case SQLSERVER:
- return DEFAULT_SQLSERVER_TASK;
+ return SQLSERVER_TASK;
case MONGODB:
- return DEFAULT_MONGODB_TASK;
+ return MONGODB_TASK;
case REDIS:
- return DEFAULT_REDIS_TASK;
+ return REDIS_TASK;
case MQTT:
- return DEFAULT_MQTT_TASK;
+ return MQTT_TASK;
case COS:
- return DEFAULT_COS_TASK;
+ return COS_TASK;
case MOCK:
- return DEFAULT_MOCK_TASK;
+ return MOCK_TASK;
default:
logger.error("invalid task type {}", taskType);
return null;
@@ -153,7 +156,7 @@ public class TaskProfile extends AbstractConfiguration {
*/
@Override
public boolean allRequiredKeyExist() {
- return hasKey(TaskConstants.TASK_ID) &&
hasKey(TaskConstants.TASK_SOURCE)
+ return hasKey(TaskConstants.TASK_ID)
&& hasKey(TaskConstants.TASK_SINK) &&
hasKey(TaskConstants.TASK_CHANNEL)
&& hasKey(TaskConstants.TASK_GROUP_ID) &&
hasKey(TaskConstants.TASK_STREAM_ID);
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 201ec24713..da786b7cf6 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -29,9 +29,6 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_GROUP_ID = "task.groupId";
public static final String TASK_STREAM_ID = "task.streamId";
public static final String RESTORE_FROM_DB = "task.restoreFromDB";
-
- public static final String TASK_SOURCE = "task.source";
-
public static final String TASK_CHANNEL = "task.channel";
public static final String TASK_TYPE = "task.taskType";
public static final String TASK_FILE_TRIGGER = "task.fileTask.trigger";
@@ -75,7 +72,7 @@ public class TaskConstants extends CommonConstants {
public static final String COS_CONTENT_STYLE = "task.cosTask.contentStyle";
public static final String COS_MAX_NUM = "task.cosTask.maxFileCount";
public static final String COS_TASK_PATTERN = "task.cosTask.pattern";
- public static final String TASK_COS_TIME_OFFSET =
"task.cosTask.timeOffset";
+ public static final String COS_TIME_OFFSET = "task.cosTask.timeOffset";
public static final String COS_TASK_RETRY = "task.cosTask.retry";
public static final String COS_TASK_TIME_FROM =
"task.cosTask.dataTimeFrom";
public static final String COS_TASK_TIME_TO = "task.cosTask.dataTimeTo";
@@ -86,6 +83,20 @@ public class TaskConstants extends CommonConstants {
public static final String COS_DATA_SEPARATOR =
"task.cosTask.dataSeparator";
public static final String COS_FILTER_STREAMS =
"task.cosTask.filterStreams";
+ // SQL task
+ public static final String SQL_TASK_CYCLE_UNIT = "task.sqlTask.cycleUnit";
+ public static final String SQL_MAX_NUM = "task.sqlTask.maxInstanceCount";
+ public static final String SQL_TASK_SQL = "task.sqlTask.sql";
+ public static final String SQL_TIME_OFFSET = "task.sqlTask.timeOffset";
+ public static final String SQL_TASK_RETRY = "task.sqlTask.retry";
+ public static final String SQL_TASK_TIME_FROM =
"task.sqlTask.dataTimeFrom";
+ public static final String SQL_TASK_TIME_TO = "task.sqlTask.dataTimeTo";
+ public static final String SQL_TASK_JDBC_URL = "task.sqlTask.jdbcUrl";
+ public static final String SQL_TASK_USERNAME = "task.sqlTask.username";
+ public static final String SQL_TASK_PASSWORD = "task.sqlTask.jdbcPassword";
+ public static final String SQL_TASK_DATA_SEPARATOR =
"task.sqlTask.dataSeparator";
+ public static final String SQL_TASK_FETCH_SIZE = "task.sqlTask.fetchSize";
+
/**
* delimiter to split offset for different task
*/
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SQLTask.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SQLTask.java
new file mode 100644
index 0000000000..4a3668fe70
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SQLTask.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class SQLTask {
+
+ private Integer id;
+ private String sql;
+ private String cycleUnit;
+ private Boolean retry;
+ private String dataTimeFrom;
+ private String dataTimeTo;
+ private String timeOffset;
+ // The number of instances that can run simultaneously for this task to
+ // prevent other problems caused by running too many instances
simultaneously
+ private Integer maxInstanceCount;
+ private String jdbcUrl;
+ private String username;
+ private String jdbcPassword;
+ private String dataSeparator;
+ // The number of rows collected from the data source in each batch
+ private Integer fetchSize;
+
+ @Data
+ public static class SQLTaskConfig {
+
+ private String sql;
+ private String cycleUnit;
+ private Boolean retry;
+ private String dataTimeFrom;
+ private String dataTimeTo;
+ // '1m' means one minute after, '-1m' means one minute before
+ // '1h' means one hour after, '-1h' means one hour before
+ // '1d' means one day after, '-1d' means one day before
+ // Null means from current timestamp
+ private String timeOffset;
+ private Integer maxInstanceCount;
+ private String jdbcUrl;
+ private String username;
+ private String jdbcPassword;
+ private String dataSeparator;
+ private Integer fetchSize;
+ }
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index a30bf44ecc..2e4849a209 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -31,6 +31,7 @@ import
org.apache.inlong.agent.pojo.OracleTask.OracleTaskConfig;
import org.apache.inlong.agent.pojo.PostgreSQLTask.PostgreSQLTaskConfig;
import org.apache.inlong.agent.pojo.PulsarTask.PulsarTaskConfig;
import org.apache.inlong.agent.pojo.RedisTask.RedisTaskConfig;
+import org.apache.inlong.agent.pojo.SQLTask.SQLTaskConfig;
import org.apache.inlong.agent.pojo.SqlServerTask.SqlserverTaskConfig;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.enums.TaskTypeEnum;
@@ -57,53 +58,8 @@ public class TaskProfileDto {
public static final String DEFAULT_DATA_PROXY_SINK =
"org.apache.inlong.agent.plugin.sinks.ProxySink";
public static final String PULSAR_SINK =
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
public static final String KAFKA_SINK =
"org.apache.inlong.agent.plugin.sinks.KafkaSink";
- /**
- * file source
- */
- public static final String DEFAULT_SOURCE =
"org.apache.inlong.agent.plugin.sources.LogFileSource";
- /**
- * binlog source
- */
- public static final String BINLOG_SOURCE =
"org.apache.inlong.agent.plugin.sources.BinlogSource";
- /**
- * kafka source
- */
- public static final String KAFKA_SOURCE =
"org.apache.inlong.agent.plugin.sources.KafkaSource";
- // pulsar source
- public static final String PULSAR_SOURCE =
"org.apache.inlong.agent.plugin.sources.PulsarSource";
- /**
- * PostgreSQL source
- */
- public static final String POSTGRESQL_SOURCE =
"org.apache.inlong.agent.plugin.sources.PostgreSQLSource";
- /**
- * mongo source
- */
- public static final String MONGO_SOURCE =
"org.apache.inlong.agent.plugin.sources.MongoDBSource";
- /**
- * oracle source
- */
- public static final String ORACLE_SOURCE =
"org.apache.inlong.agent.plugin.sources.OracleSource";
- /**
- * redis source
- */
- public static final String REDIS_SOURCE =
"org.apache.inlong.agent.plugin.sources.RedisSource";
- /**
- * mqtt source
- */
- public static final String MQTT_SOURCE =
"org.apache.inlong.agent.plugin.sources.MqttSource";
- /**
- * sqlserver source
- */
- public static final String SQLSERVER_SOURCE =
"org.apache.inlong.agent.plugin.sources.SQLServerSource";
- /**
- * cos source
- */
- public static final String COS_SOURCE =
"org.apache.inlong.agent.plugin.sources.COSSource";
-
private static final Gson GSON = new Gson();
-
public static final String deafult_time_offset = "0";
-
private static final String DEFAULT_AUDIT_VERSION = "0";
private Task task;
@@ -154,7 +110,7 @@ public class TaskProfileDto {
FileTaskConfig.class);
FileTask.Dir dir = new FileTask.Dir();
- dir.setPatterns(taskConfig.getPattern());
+ dir.setPatterns(taskConfig.getPattern().trim());
dir.setBlackList(taskConfig.getBlackList());
fileTask.setDir(dir);
fileTask.setCollectType(taskConfig.getCollectType());
@@ -200,7 +156,7 @@ public class TaskProfileDto {
cosTask.setId(dataConfig.getTaskId());
COSTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(),
COSTaskConfig.class);
- cosTask.setPattern(taskConfig.getPattern());
+ cosTask.setPattern(taskConfig.getPattern().trim());
cosTask.setCollectType(taskConfig.getCollectType());
cosTask.setContentStyle(taskConfig.getContentStyle());
cosTask.setDataSeparator(taskConfig.getDataSeparator());
@@ -224,6 +180,30 @@ public class TaskProfileDto {
return cosTask;
}
+ private static SQLTask getSQLTask(DataConfig dataConfig) {
+ SQLTask sqlTask = new SQLTask();
+ sqlTask.setId(dataConfig.getTaskId());
+ SQLTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(),
+ SQLTaskConfig.class);
+ sqlTask.setSql(taskConfig.getSql());
+ sqlTask.setMaxInstanceCount(taskConfig.getMaxInstanceCount());
+ sqlTask.setRetry(taskConfig.getRetry());
+ sqlTask.setCycleUnit(taskConfig.getCycleUnit());
+ sqlTask.setDataTimeFrom(taskConfig.getDataTimeFrom());
+ sqlTask.setDataTimeTo(taskConfig.getDataTimeTo());
+ sqlTask.setUsername(taskConfig.getUsername());
+ sqlTask.setJdbcPassword(taskConfig.getJdbcPassword());
+ sqlTask.setDataSeparator(taskConfig.getDataSeparator());
+ sqlTask.setFetchSize(taskConfig.getFetchSize());
+ sqlTask.setJdbcUrl(taskConfig.getJdbcUrl());
+ if (taskConfig.getTimeOffset() != null) {
+ sqlTask.setTimeOffset(taskConfig.getTimeOffset());
+ } else {
+ sqlTask.setTimeOffset(deafult_time_offset +
sqlTask.getCycleUnit());
+ }
+ return sqlTask;
+ }
+
private static KafkaTask getKafkaTask(DataConfig dataConfigs) {
KafkaTaskConfig kafkaTaskConfig =
GSON.fromJson(dataConfigs.getExtParams(),
@@ -500,66 +480,62 @@ public class TaskProfileDto {
TaskTypeEnum taskType =
TaskTypeEnum.getTaskType(dataConfig.getTaskType());
switch (requireNonNull(taskType)) {
case SQL:
+ SQLTask sqlTask = getSQLTask(dataConfig);
+ task.setCycleUnit(sqlTask.getCycleUnit());
+ task.setSqlTask(sqlTask);
+ task.setRetry(sqlTask.getRetry());
+ profileDto.setTask(task);
+ break;
case BINLOG:
BinlogTask binlogTask = getBinlogTask(dataConfig);
task.setBinlogTask(binlogTask);
- task.setSource(BINLOG_SOURCE);
profileDto.setTask(task);
break;
case FILE:
FileTask fileTask = getFileTask(dataConfig);
task.setCycleUnit(fileTask.getCycleUnit());
task.setFileTask(fileTask);
- task.setSource(DEFAULT_SOURCE);
task.setRetry(fileTask.getRetry());
profileDto.setTask(task);
break;
case KAFKA:
KafkaTask kafkaTask = getKafkaTask(dataConfig);
task.setKafkaTask(kafkaTask);
- task.setSource(KAFKA_SOURCE);
profileDto.setTask(task);
break;
case PULSAR:
PulsarTask pulsarTask = getPulsarTask(dataConfig);
task.setPulsarTask(pulsarTask);
- task.setSource(PULSAR_SOURCE);
profileDto.setTask(task);
break;
case POSTGRES:
PostgreSQLTask postgreSQLTask = getPostgresTask(dataConfig);
task.setPostgreSQLTask(postgreSQLTask);
- task.setSource(POSTGRESQL_SOURCE);
profileDto.setTask(task);
break;
case ORACLE:
OracleTask oracleTask = getOracleTask(dataConfig);
task.setOracleTask(oracleTask);
- task.setSource(ORACLE_SOURCE);
profileDto.setTask(task);
break;
case SQLSERVER:
SqlServerTask sqlserverTask = getSqlServerTask(dataConfig);
task.setSqlserverTask(sqlserverTask);
- task.setSource(SQLSERVER_SOURCE);
profileDto.setTask(task);
break;
case MONGODB:
MongoTask mongoTask = getMongoTask(dataConfig);
task.setMongoTask(mongoTask);
- task.setSource(MONGO_SOURCE);
profileDto.setTask(task);
break;
case REDIS:
RedisTask redisTask = getRedisTask(dataConfig);
task.setRedisTask(redisTask);
- task.setSource(REDIS_SOURCE);
profileDto.setTask(task);
break;
case MQTT:
MqttTask mqttTask = getMqttTask(dataConfig);
task.setMqttTask(mqttTask);
- task.setSource(MQTT_SOURCE);
profileDto.setTask(task);
break;
case MOCK:
@@ -569,7 +545,6 @@ public class TaskProfileDto {
COSTask cosTask = getCOSTask(dataConfig);
task.setCycleUnit(cosTask.getCycleUnit());
task.setCosTask(cosTask);
- task.setSource(COS_SOURCE);
task.setRetry(cosTask.getRetry());
profileDto.setTask(task);
break;
@@ -618,6 +593,7 @@ public class TaskProfileDto {
private MqttTask mqttTask;
private SqlServerTask sqlserverTask;
private COSTask cosTask;
+ private SQLTask sqlTask;
}
@Data
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLInstance.java
new file mode 100644
index 0000000000..eb79f1fa21
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLInstance.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+import java.io.IOException;
+
+/**
+ * SQL instance contains source and sink.
+ * main job is to read from source and write to sink
+ */
+public class SQLInstance extends CommonInstance {
+
+ @Override
+ public void setInodeInfo(InstanceProfile profile) throws IOException {
+ profile.set(TaskConstants.INODE_INFO, "");
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
index b6792ac82d..5dd236f710 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
@@ -119,7 +119,7 @@ public class COSSource extends AbstractSource {
if (offsetProfile != null) {
offset = offsetProfile.toJsonStr();
}
- LOGGER.info("LogFileSource init: {} offset: {}",
profile.toJsonStr(), offset);
+ LOGGER.info("COS source init: {} offset: {}", profile.toJsonStr(),
offset);
AgentConfiguration conf = AgentConfiguration.getAgentConf();
int permit = conf.getInt(AGENT_GLOBAL_COS_SOURCE_PERMIT,
DEFAULT_AGENT_GLOBAL_COS_SOURCE_PERMIT);
MemoryManager.getInstance().addSemaphore(AGENT_GLOBAL_COS_SOURCE_PERMIT,
permit);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
new file mode 100755
index 0000000000..e73db1be90
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.except.FileException;
+import org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler;
+import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.inlong.agent.constant.TaskConstants.SQL_TASK_DATA_SEPARATOR;
+import static
org.apache.inlong.agent.constant.TaskConstants.SQL_TASK_FETCH_SIZE;
+
+/**
+ * Read by SQL
+ */
+public class SQLSource extends AbstractSource {
+
+ public static final String AGENT_GLOBAL_SQL_SOURCE_PERMIT =
"agent.global.sql.source.permit";
+ public static final int DEFAULT_AGENT_GLOBAL_SQL_SOURCE_PERMIT = 128 *
1000 * 1000;
+ private int MAX_RECONNECT_TIMES = 3;
+ private int RECONNECT_INTERVAL_SECOND = 10;
+ private int DEFAULT_FETCH_SIZE = 1000;
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ protected static class FileOffset {
+
+ private Long lineOffset;
+ private Long byteOffset;
+ private boolean hasByteOffset;
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SQLSource.class);
+ public static final String OFFSET_SEP = ":";
+ protected final Integer WAIT_TIMEOUT_MS = 10;
+
+ private String finalSQL;
+ private String jdbcUrl;
+ private String username;
+ private String password;
+ private Connection conn;
+ protected BlockingQueue<SourceData> queue;
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("sql-source-pool"));
+ private volatile boolean running = false;
+ private boolean isMysql = true;
+ private volatile boolean fileExist = true;
+
+ public SQLSource() {
+ }
+
+ @Override
+ protected void initExtendClass() {
+ extendClass = DefaultExtendedHandler.class.getCanonicalName();
+ }
+
+ @Override
+ protected void initSource(InstanceProfile profile) {
+ try {
+ LOGGER.info("sql source init: {}", profile.toJsonStr());
+ AgentConfiguration conf = AgentConfiguration.getAgentConf();
+ int permit = conf.getInt(AGENT_GLOBAL_SQL_SOURCE_PERMIT,
DEFAULT_AGENT_GLOBAL_SQL_SOURCE_PERMIT);
+
MemoryManager.getInstance().addSemaphore(AGENT_GLOBAL_SQL_SOURCE_PERMIT,
permit);
+ finalSQL = profile.getInstanceId();
+ jdbcUrl =
profile.get(TaskConstants.SQL_TASK_JDBC_URL).trim().replace("\r", "")
+ .replace("\n", "");
+ if (jdbcUrl.startsWith("jdbc:mysql:")) {
+ isMysql = true;
+ }
+ username = profile.get(TaskConstants.SQL_TASK_USERNAME);
+ password = profile.get(TaskConstants.SQL_TASK_PASSWORD);
+
+ queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+ initConn();
+ EXECUTOR_SERVICE.execute(run());
+ } catch (Exception ex) {
+ stopRunning();
+ throw new FileException("error init stream for " + finalSQL, ex);
+ }
+ }
+
+ private void initConn() throws SQLException {
+ int retryTimes = 0;
+ while (conn == null) {
+ try {
+ conn = DriverManager.getConnection(jdbcUrl, username,
password);
+ } catch (Exception e) {
+ retryTimes++;
+ if (retryTimes >= MAX_RECONNECT_TIMES) {
+ throw new SQLException(
+ "Failed to connect database after retry " +
retryTimes + " times.", e);
+ }
+ LOGGER.warn("Reconnect database after {} seconds due to the
following error: {}",
+ RECONNECT_INTERVAL_SECOND, e.getMessage());
+ AgentUtils.silenceSleepInSeconds(RECONNECT_INTERVAL_SECOND);
+ }
+ }
+
+ }
+
+ @Override
+ protected boolean doPrepareToRead() {
+
+ return true;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ if (queue.isEmpty()) {
+ return null;
+ }
+ int count = 0;
+ int len = 0;
+ List<SourceData> lines = new ArrayList<>();
+ while (!queue.isEmpty() && count < BATCH_READ_LINE_COUNT && len <
BATCH_READ_LINE_TOTAL_LEN) {
+ if (len + queue.peek().getData().length >
BATCH_READ_LINE_TOTAL_LEN) {
+ break;
+ }
+ len += queue.peek().getData().length;
+ count++;
+ lines.add(queue.poll());
+ }
+ MemoryManager.getInstance().release(AGENT_GLOBAL_SQL_SOURCE_PERMIT,
len);
+ return lines;
+ }
+
+ @Override
+ protected void printCurrentState() {
+ LOGGER.info("sql source running, sql: {}", instanceId);
+ }
+
+ @Override
+ protected String getThreadName() {
+ return "sql-source-" + taskId + "-" + finalSQL;
+ }
+
+ private Runnable run() {
+ return () -> {
+ AgentThreadFactory.nameThread(getThreadName());
+ running = true;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ fileExist = false;
+ LOGGER.error("do run error maybe connect broken: ", e);
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+ }
+ running = false;
+ };
+ }
+
+ protected void doRun() throws SQLException, IOException {
+ try (Statement stmt =
conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+ if (isMysql) {
+ stmt.setFetchSize(Integer.MIN_VALUE);
+ } else {
+ stmt.setFetchSize(profile.getInt(SQL_TASK_FETCH_SIZE,
DEFAULT_FETCH_SIZE));
+ }
+ try (ResultSet rs = stmt.executeQuery(profile.getInstanceId())) {
+ ResultSetMetaData metaData = rs.getMetaData();
+ int columnCount = metaData.getColumnCount();
+ ByteArrayOutputStream bas = new ByteArrayOutputStream();
+ byte[] sep =
profile.get(SQL_TASK_DATA_SEPARATOR).getBytes(StandardCharsets.UTF_8);
+ while (rs.next()) {
+ for (int i = 1; i <= columnCount; i++) {
+ if (i > 1) {
+ bas.write(sep);
+ }
+ bas.write(rs.getBytes(i));
+ }
+ SourceData sourceData = new SourceData(bas.toByteArray(),
+ getOffsetString(0L, 0L));
+ boolean suc = false;
+ while (isRunnable() && !suc) {
+ boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_SQL_SOURCE_PERMIT, sourceData.getData().length);
+ if (!suc4Queue) {
+ break;
+ }
+ suc = queue.offer(sourceData);
+ if (!suc) {
+ MemoryManager.getInstance()
+ .release(AGENT_GLOBAL_SQL_SOURCE_PERMIT,
sourceData.getData().length);
+ AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
+ }
+ }
+ bas.reset();
+ }
+ }
+ } catch (SQLException e) {
+ LOGGER.error("read from result set error: ", e);
+ throw e;
+ } catch (IOException e) {
+ LOGGER.error("read from result io error: ", e);
+ throw e;
+ }
+ }
+
+ private String getOffsetString(Long lineOffset, Long byteOffset) {
+ return lineOffset + OFFSET_SEP + byteOffset;
+ }
+
+ @Override
+ protected boolean isRunnable() {
+ return runnable && fileExist;
+ }
+
+ @Override
+ public boolean sourceExist() {
+ return fileExist;
+ }
+
+ @Override
+ protected void releaseSource() {
+ while (running) {
+ AgentUtils.silenceSleepInMs(1);
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ LOGGER.error("close connect error: ", e);
+ }
+ }
+ while (!queue.isEmpty()) {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_SQL_SOURCE_PERMIT,
queue.poll().getData().length);
+ }
+ }
+
+ private boolean waitForPermit(String permitName, int permitLen) {
+ boolean suc = false;
+ while (!suc) {
+ suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
+ if (!suc) {
+ MemoryManager.getInstance().printDetail(permitName,
"sql_source");
+ if (!isRunnable()) {
+ return false;
+ }
+ AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
+ }
+ }
+ return true;
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
index e234042524..4c7b729a4d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
@@ -20,7 +20,7 @@ package org.apache.inlong.agent.plugin.task.logcollection;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.core.task.TaskAction;
import org.apache.inlong.agent.plugin.task.AbstractTask;
-import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
import org.apache.inlong.agent.plugin.utils.regex.Scanner;
import org.apache.inlong.agent.state.State;
@@ -156,18 +156,50 @@ public abstract class LogAbstractTask extends
AbstractTask {
* offset
*/
private boolean shouldStartNow(String dataTime) {
- String shouldStartTime = NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), timeOffset);
+ String shouldStartTime = DateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), timeOffset);
String currentTime = getCurrentTime();
return currentTime.compareTo(shouldStartTime) >= 0;
}
private String getCurrentTime() {
- SimpleDateFormat dateFormat = new
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
- TimeZone timeZone =
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
+ SimpleDateFormat dateFormat = new
SimpleDateFormat(DateUtils.DEFAULT_FORMAT);
+ TimeZone timeZone = TimeZone.getTimeZone(DateUtils.DEFAULT_TIME_ZONE);
dateFormat.setTimeZone(timeZone);
return dateFormat.format(new Date(System.currentTimeMillis()));
}
+ protected void addToEvenMap(String fileName, String dataTime, Long
fileUpdateTime, String cycleUnit) {
+ if (isInEventMap(fileName, dataTime)) {
+ LOGGER.info("add to evenMap isInEventMap returns true skip taskId
{} dataTime {} fileName {}",
+ taskProfile.getTaskId(), dataTime, fileName);
+ return;
+ }
+ if (!shouldAddAgain(fileName, fileUpdateTime)) {
+ LOGGER.info("add to evenMap shouldAddAgain returns false skip
taskId {} dataTime {} fileName {}",
+ taskProfile.getTaskId(), dataTime, fileName);
+ return;
+ }
+ Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.computeIfAbsent(dataTime,
+ mapKey -> new ConcurrentHashMap<>());
+ boolean containsInMemory = sameDataTimeEvents.containsKey(fileName);
+ if (containsInMemory) {
+ LOGGER.error("should not happen! may be {} has been deleted and
add again", fileName);
+ return;
+ }
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(fileName, cycleUnit, dataTime,
+ fileUpdateTime);
+ sameDataTimeEvents.put(fileName, instanceProfile);
+ LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
+ }
+
+ private boolean isInEventMap(String fileName, String dataTime) {
+ Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime);
+ if (fileToProfile == null) {
+ return false;
+ }
+ return fileToProfile.get(fileName) != null;
+ }
+
protected void removeTimeoutEvent(Map<String, Map<String,
InstanceProfile>> eventMap, boolean isRetry) {
if (isRetry) {
return;
@@ -175,7 +207,7 @@ public abstract class LogAbstractTask extends AbstractTask {
for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
/* If the data time of the event is within 2 days before (after)
the current time, it is valid */
String dataTime = entry.getKey();
- if (!NewDateUtils.isValidCreationTime(dataTime,
DAY_TIMEOUT_INTERVAL)) {
+ if (!DateUtils.isValidCreationTime(dataTime,
DAY_TIMEOUT_INTERVAL)) {
/* Remove it from memory map. */
eventMap.remove(dataTime);
LOGGER.warn("remove too old event from event map. dataTime
{}", dataTime);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/SQLTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/SQLTask.java
new file mode 100644
index 0000000000..ba6995bf03
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/SQLTask.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.logcollection;
+
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner.FinalPatternInfo;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.util.List;
+
+public class SQLTask extends LogAbstractTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SQLTask.class);
+ private String originPattern;
+ private long lastScanTime = 0;
+ public final long SCAN_INTERVAL = 1 * 60 * 1000;
+
+ @Override
+ protected int getInstanceLimit() {
+ return taskProfile.getInt(TaskConstants.SQL_MAX_NUM);
+ }
+
+ @Override
+ protected void initTask() {
+ super.initTask();
+ timeOffset = taskProfile.get(TaskConstants.SQL_TIME_OFFSET, "");
+ retry = taskProfile.getBoolean(TaskConstants.SQL_TASK_RETRY, false);
+ originPattern = taskProfile.get(TaskConstants.SQL_TASK_SQL);
+ if (retry) {
+ initRetryTask(taskProfile);
+ }
+ }
+
+ private boolean initRetryTask(TaskProfile profile) {
+ String dataTimeFrom = profile.get(TaskConstants.SQL_TASK_TIME_FROM,
"");
+ String dataTimeTo = profile.get(TaskConstants.SQL_TASK_TIME_TO, "");
+ try {
+ startTime = DateTransUtils.timeStrConvertToMillSec(dataTimeFrom,
profile.getCycleUnit());
+ endTime = DateTransUtils.timeStrConvertToMillSec(dataTimeTo,
profile.getCycleUnit());
+ } catch (ParseException e) {
+ LOGGER.error("retry task time error start {} end {}",
dataTimeFrom, dataTimeTo, e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isProfileValid(TaskProfile profile) {
+ if (!profile.allRequiredKeyExist()) {
+ LOGGER.error("task profile needs all required key");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.SQL_TASK_CYCLE_UNIT)) {
+ LOGGER.error("task profile needs sql cycle unit");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_CYCLE_UNIT)) {
+ LOGGER.error("task profile needs cycle unit");
+ return false;
+ }
+ if (profile.get(TaskConstants.TASK_CYCLE_UNIT)
+ .compareTo(profile.get(TaskConstants.SQL_TASK_CYCLE_UNIT)) !=
0) {
+ LOGGER.error("task profile cycle unit must be consistent");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_TIME_ZONE)) {
+ LOGGER.error("task profile needs time zone");
+ return false;
+ }
+ boolean ret = profile.hasKey(TaskConstants.SQL_TASK_SQL)
+ && profile.hasKey(TaskConstants.SQL_MAX_NUM);
+ if (!ret) {
+ LOGGER.error("task profile needs file keys");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.SQL_TIME_OFFSET)) {
+ LOGGER.error("task profile needs time offset");
+ return false;
+ }
+ if (profile.getBoolean(TaskConstants.SQL_TASK_RETRY, false)) {
+ if (!initRetryTask(profile)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected void releaseTask() {
+ }
+
+ @Override
+ protected void runForNormal() {
+ if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) {
+ scanExistingFile();
+ lastScanTime = AgentUtils.getCurrentTime();
+ }
+ dealWithEventMap();
+ }
+
+ @Override
+ protected void scanExistingFile() {
+ List<FinalPatternInfo> finalPatternInfos =
Scanner.getFinalPatternInfos(originPattern,
+ taskProfile.getCycleUnit(), timeOffset, startTime, endTime,
retry);
+ LOGGER.info("taskId {} scan {} get file count {}", getTaskId(),
originPattern, finalPatternInfos.size());
+ finalPatternInfos.forEach((fileInfo) -> {
+ String dataTime =
DateTransUtils.millSecConvertToTimeStr(fileInfo.dataTime,
taskProfile.getCycleUnit());
+ addToEvenMap(fileInfo.finalPattern, dataTime, 0L,
taskProfile.getCycleUnit());
+ if (retry) {
+ instanceCount++;
+ }
+ });
+ }
+
+ @Override
+ protected void dealWithEventMap() {
+ removeTimeoutEvent(eventMap, retry);
+ dealWithEventMapWithCycle();
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
index 86210157ed..c7bb5c9309 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
@@ -17,7 +17,6 @@
package org.apache.inlong.agent.plugin.task.logcollection.cos;
-import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.plugin.task.logcollection.LogAbstractTask;
@@ -33,12 +32,7 @@ import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-/**
- * Watch directory, if new valid files are created, create instance
correspondingly.
- */
public class COSTask extends LogAbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(COSTask.class);
@@ -59,7 +53,7 @@ public class COSTask extends LogAbstractTask {
@Override
protected void initTask() {
super.initTask();
- timeOffset = taskProfile.get(TaskConstants.TASK_COS_TIME_OFFSET, "");
+ timeOffset = taskProfile.get(TaskConstants.COS_TIME_OFFSET, "");
retry = taskProfile.getBoolean(TaskConstants.COS_TASK_RETRY, false);
originPattern = taskProfile.get(TaskConstants.COS_TASK_PATTERN);
bucketName = taskProfile.get(TaskConstants.COS_TASK_BUCKET_NAME);
@@ -114,7 +108,7 @@ public class COSTask extends LogAbstractTask {
LOGGER.error("task profile needs file keys");
return false;
}
- if (!profile.hasKey(TaskConstants.TASK_COS_TIME_OFFSET)) {
+ if (!profile.hasKey(TaskConstants.COS_TIME_OFFSET)) {
LOGGER.error("task profile needs time offset");
return false;
}
@@ -146,51 +140,19 @@ public class COSTask extends LogAbstractTask {
taskProfile.getCycleUnit(), timeOffset, startTime, endTime,
retry);
LOGGER.info("taskId {} scan {} get file count {}", getTaskId(),
originPattern, fileInfos.size());
fileInfos.forEach((fileInfo) -> {
- addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
+ String fileName = fileInfo.fileName;
+ String dataTime = fileInfo.dataTime;
+ ObjectMetadata meta = cosClient.getObjectMetadata(bucketName,
fileName);
+ addToEvenMap(fileName, dataTime, meta.getLastModified().getTime(),
taskProfile.getCycleUnit());
if (retry) {
instanceCount++;
}
});
}
- private boolean isInEventMap(String fileName, String dataTime) {
- Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime);
- if (fileToProfile == null) {
- return false;
- }
- return fileToProfile.get(fileName) != null;
- }
-
@Override
protected void dealWithEventMap() {
removeTimeoutEvent(eventMap, retry);
dealWithEventMapWithCycle();
}
-
- private void addToEvenMap(String fileName, String dataTime) {
- if (isInEventMap(fileName, dataTime)) {
- LOGGER.info("add to evenMap isInEventMap returns true skip taskId
{} dataTime {} fileName {}",
- taskProfile.getTaskId(), dataTime, fileName);
- return;
- }
- ObjectMetadata meta = cosClient.getObjectMetadata(bucketName,
fileName);
- Long fileUpdateTime = meta.getLastModified().getTime();
- if (!shouldAddAgain(fileName, fileUpdateTime)) {
- LOGGER.info("add to evenMap shouldAddAgain returns false skip
taskId {} dataTime {} fileName {}",
- taskProfile.getTaskId(), dataTime, fileName);
- return;
- }
- Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.computeIfAbsent(dataTime,
- mapKey -> new ConcurrentHashMap<>());
- boolean containsInMemory = sameDataTimeEvents.containsKey(fileName);
- if (containsInMemory) {
- LOGGER.error("should not happen! may be {} has been deleted and
add again", fileName);
- return;
- }
- String cycleUnit = taskProfile.getCycleUnit();
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(fileName, cycleUnit, dataTime,
- fileUpdateTime);
- sameDataTimeEvents.put(fileName, instanceProfile);
- LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
- }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
index ef50d1fc62..82fca57109 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
@@ -17,13 +17,12 @@
package org.apache.inlong.agent.plugin.task.logcollection.local;
-import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.plugin.task.logcollection.LogAbstractTask;
import
org.apache.inlong.agent.plugin.task.logcollection.local.FileScanner.BasicFileInfo;
-import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression;
import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -222,7 +221,9 @@ public class FileTask extends LogAbstractTask {
List<BasicFileInfo> fileInfos =
scanExistingFileByPattern(originPattern);
LOGGER.info("taskId {} scan {} get file count {}", getTaskId(),
originPattern, fileInfos.size());
fileInfos.forEach((fileInfo) -> {
- addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
+ String fileName = fileInfo.fileName;
+ Long fileUpdateTime =
FileUtils.getFileLastModifyTime(fileName);
+ addToEvenMap(fileName, fileInfo.dataTime, fileUpdateTime,
taskProfile.getCycleUnit());
if (retry) {
instanceCount++;
}
@@ -230,17 +231,6 @@ public class FileTask extends LogAbstractTask {
});
}
- private boolean isInEventMap(String fileName, String dataTime) {
- Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime);
- if (fileToProfile == null) {
- return false;
- }
- if (fileToProfile.get(fileName) == null) {
- return false;
- }
- return true;
- }
-
private List<BasicFileInfo> scanExistingFileByPattern(String
originPattern) {
if (realTime) {
return FileScanner.scanTaskBetweenTimes(originPattern,
CycleUnitType.HOUR, timeOffset,
@@ -306,15 +296,14 @@ public class FileTask extends LogAbstractTask {
private void dealWithWatchKey(WatchEntity entity, WatchKey key) throws
IOException {
Path contextPath = entity.getPath(key);
- LOGGER.info("Find creation events in path: " +
contextPath.toAbsolutePath());
+ LOGGER.info("Find creation events in path: {}",
contextPath.toAbsolutePath());
for (WatchEvent<?> watchEvent : key.pollEvents()) {
Path child = resolvePathFromEvent(watchEvent, contextPath);
if (child == null) {
continue;
}
if (Files.isDirectory(child)) {
- LOGGER.info("The find creation event is triggered by a
directory: " + child
- .getFileName());
+ LOGGER.info("The find creation event is triggered by a
directory: {}", child.getFileName());
entity.registerRecursively(child);
continue;
}
@@ -349,61 +338,30 @@ public class FileTask extends LogAbstractTask {
Matcher matcher = entity.getPattern().matcher(newFileName);
if (matcher.matches() || matcher.lookingAt()) {
LOGGER.info("matched file {} {}", newFileName,
entity.getPattern());
- String dataTime = getDataTimeFromFileName(newFileName,
entity.getOriginPattern(),
- entity.getDateExpression());
+ String dataTime = getDataTimeFromFileName(newFileName,
entity.getDateExpression());
if (!checkFileNameForTime(newFileName, entity)) {
LOGGER.error("File Timeout {} {}", newFileName, dataTime);
return;
}
- addToEvenMap(newFileName, dataTime);
- }
- }
-
- private void addToEvenMap(String fileName, String dataTime) {
- if (isInEventMap(fileName, dataTime)) {
- LOGGER.info("addToEvenMap isInEventMap returns true skip taskId {}
dataTime {} fileName {}",
- taskProfile.getTaskId(), dataTime, fileName);
- return;
- }
- Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName);
- if (!shouldAddAgain(fileName, fileUpdateTime)) {
- LOGGER.info("addToEvenMap shouldAddAgain returns false skip taskId
{} dataTime {} fileName {}",
- taskProfile.getTaskId(), dataTime, fileName);
- return;
- }
- Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.computeIfAbsent(dataTime,
- mapKey -> new ConcurrentHashMap<>());
- boolean containsInMemory = sameDataTimeEvents.containsKey(fileName);
- if (containsInMemory) {
- LOGGER.error("should not happen! may be {} has been deleted and
add again", fileName);
- return;
- }
- String cycleUnit = "";
- if (realTime) {
- cycleUnit = CycleUnitType.HOUR;
- } else {
- cycleUnit = taskProfile.getCycleUnit();
+ Long fileUpdateTime = FileUtils.getFileLastModifyTime(newFileName);
+ addToEvenMap(newFileName, dataTime, fileUpdateTime,
taskProfile.getCycleUnit());
}
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(fileName, cycleUnit, dataTime,
- fileUpdateTime);
- sameDataTimeEvents.put(fileName, instanceProfile);
- LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
}
private boolean checkFileNameForTime(String newFileName, WatchEntity
entity) {
/* Get the data time for this file. */
PathDateExpression dateExpression = entity.getDateExpression();
if (dateExpression.getLongestDatePattern().length() != 0) {
- String dataTime = getDataTimeFromFileName(newFileName,
entity.getOriginPattern(), dateExpression);
+ String dataTime = getDataTimeFromFileName(newFileName,
dateExpression);
LOGGER.info("file {}, fileTime {}", newFileName, dataTime);
- if (!NewDateUtils.isValidCreationTime(dataTime,
entity.getCycleUnit(), timeOffset)) {
+ if (!DateUtils.isValidCreationTime(dataTime,
entity.getCycleUnit(), timeOffset)) {
return false;
}
}
return true;
}
- private String getDataTimeFromFileName(String fileName, String
originPattern, PathDateExpression dateExpression) {
+ private String getDataTimeFromFileName(String fileName, PathDateExpression
dateExpression) {
/*
* TODO: what if this file doesn't have any date pattern regex.
*
@@ -411,7 +369,7 @@ public class FileTask extends LogAbstractTask {
* reading and start reading this new file.
*/
// Extract data time from file name
- String fileTime = NewDateUtils.getDateTime(fileName, originPattern,
dateExpression);
+ String fileTime = DateUtils.getDateTime(fileName, dateExpression);
/**
* Replace any non-numeric characters in the file time
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
index 47d052e81b..b539334039 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
@@ -18,7 +18,6 @@
package org.apache.inlong.agent.plugin.task.logcollection.local;
import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
-import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
import org.apache.inlong.agent.plugin.utils.regex.NonRegexPatternPosition;
import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression;
import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
@@ -71,12 +70,12 @@ public class WatchEntity {
this.originPattern = originPattern;
ArrayList<String> directoryLayers =
PatternUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
this.basicStaticPath = directoryLayers.get(0);
- this.regexPattern =
NewDateUtils.replaceDateExpressionWithRegex(originPattern);
+ this.regexPattern =
DateUtils.replaceDateExpressionWithRegex(originPattern);
pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE |
Pattern.DOTALL | Pattern.MULTILINE);
ArrayList<String> directories =
PatternUtil.cutDirectoryByWildcard(originPattern);
this.originPatternWithoutFileName = directories.get(0);
this.patternWithoutFileName = Pattern
-
.compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
+
.compile(DateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
Pattern.CASE_INSENSITIVE | Pattern.DOTALL |
Pattern.MULTILINE);
/*
* Get the longest data regex from the data name, it's used if we want
to get out the data time from the file
@@ -219,7 +218,7 @@ public class WatchEntity {
} else {
dirPattern = originPatternWithoutFileName.substring(0, index);
}
- Pattern pattern =
Pattern.compile(NewDateUtils.replaceDateExpressionWithRegex(dirPattern),
+ Pattern pattern =
Pattern.compile(DateUtils.replaceDateExpressionWithRegex(dirPattern),
Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
return pattern;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
index 2366dbae06..aad912b46f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
@@ -17,11 +17,20 @@
package org.apache.inlong.agent.plugin.utils.regex;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.utils.DateTransUtils;
+
import hirondelle.date4j.DateTime;
import org.apache.commons.lang.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
import java.util.Objects;
import java.util.TimeZone;
import java.util.regex.Matcher;
@@ -29,31 +38,343 @@ import java.util.regex.Pattern;
public class DateUtils {
- private static final Logger logger = LogManager.getLogger(DateUtils.class);
+ private static final String YEAR = "YYYY";
+ private static final String YEAR_LOWERCASE = "yyyy";
+ private static final String MONTH = "MM";
+ private static final String DAY = "DD";
+ private static final String DAY_LOWERCASE = "dd";
+ private static final String HOUR = "HH";
+ private static final String HOUR_LOWERCASE = "hh";
+ private static final String MINUTE = "mm";
+ public static final String DEFAULT_FORMAT = "yyyyMMddHHmm";
+ public static final String DEFAULT_TIME_ZONE = "Asia/Shanghai";
+ private static final Logger logger =
LoggerFactory.getLogger(DateUtils.class);
private static final String TIME_REGEX =
"YYYY(?:.MM|MM)?(?:.DD|DD)?(?:.hh|hh)?(?:.mm|mm)?(?:"
+ ".ss|ss)?";
- private static final String LIMIT_SEP = "(?<=[a-zA-Z])";
- private static final String LETTER_STR = "\\D+";
- private static final String DIGIT_STR = "[0-9]+";
private static final Pattern pattern = Pattern.compile(TIME_REGEX,
Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
- private String dateFormat = "YYYYMMDDhhmmss";
+ private static final Pattern bracePatt = Pattern.compile("\\{(.*?)\\}");
+ private static final int DEFAULT_LENGTH = "yyyyMMddHHmm".length();
+ public static long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
+ public static long HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
- public static String getSubTimeFormat(String format, int length) {
- // format may be "YYYYMMDDhhmmss" | "YYYY_MM_DD_hh_mm_ss"
- int formatLen = format.length();
- StringBuffer sb = new StringBuffer();
+ public static String getShouldStartTime(String dataTime, String cycleUnit,
+ String offset) {
+ if (dataTime == null || dataTime.length() > 12) {
+ return null;
+ }
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DEFAULT_FORMAT);
+ TimeZone timeZone = TimeZone.getTimeZone(DateUtils.DEFAULT_TIME_ZONE);
+ dateFormat.setTimeZone(timeZone);
+
+ if (dataTime.length() < DEFAULT_LENGTH) {
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < DEFAULT_LENGTH - dataTime.length(); i++) {
+ sb.append("0");
+ }
+ dataTime = dataTime + sb.toString();
+ }
+
+ Calendar calendar = Calendar.getInstance();
+ try {
+ calendar.setTimeInMillis(dateFormat.parse(dataTime).getTime());
+ } catch (ParseException e) {
+ return null;
+ }
+
+ /*
+ * The delay should be added to the data time, so remove the - from
offset.
+ */
+ if (offset.startsWith("-")) {
+ offset = offset.substring(1, offset.length());
+ } else { // positiveļ¼read file earlier
+ offset = "-" + offset;
+ }
+
+ return dateFormat
+ .format(new Date(getDateTime(calendar, cycleUnit,
offset).getTimeInMillis()));
+ }
+
+ private static Calendar getDateTime(Calendar calendar, String cycleUnit,
String offset) {
+ int cycleNumber = (cycleUnit.length() <= 1 ? 1
+ : Integer.parseInt(cycleUnit.substring(0, cycleUnit.length() -
1)));
+
+ String offsetUnit = offset.substring(offset.length() - 1,
offset.length());
+ int offsetNumber = Integer.parseInt(offset.substring(0,
offset.length() - 1));
+
+ /*
+ * For day task, the offset cycle unit can only be day; for hourly
task, the offset can't be minute; for
+ * minutely task, the offset cycle unit can be day, hour and minute,
but if the offset cycle unit is minute, the
+ * offset must be divided by cycle number.
+ */
+ if (cycleUnit.length() > 1 &&
(StringUtils.endsWithIgnoreCase(cycleUnit, "M"))) {
+ calendar.set(Calendar.SECOND, 0);
+ int minTime = calendar.get(Calendar.MINUTE);
+
+ int leftMin = minTime % cycleNumber;
+ minTime = minTime - leftMin;
+ calendar.set(Calendar.MINUTE, minTime);
+
+ /* Calculate the offset. */
+ if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) {
+ calendar.add(Calendar.DAY_OF_YEAR, offsetNumber);
+ }
+
+ if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) {
+ calendar.add(Calendar.HOUR_OF_DAY, offsetNumber);
+ }
+ } else if (cycleUnit.length() == 1) {
+ if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ }
+ }
+
+ /* Calculate the offset. */
+ if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) {
+ calendar.add(Calendar.DAY_OF_YEAR, offsetNumber);
+ }
+
+ if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) {
+ calendar.add(Calendar.HOUR_OF_DAY, offsetNumber);
+ }
+
+ if (CycleUnitType.MINUTE.equals(offsetUnit)) {
+ calendar.add(Calendar.MINUTE, offsetNumber);
+ }
+
+ return calendar;
+ }
+
+ public static boolean isValidCreationTime(String dataTime, String
cycleUnit,
+ String timeOffset) {
+ long timeInterval = 0;
+ if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
+ timeInterval = DAY_TIMEOUT_INTERVAL;
+ } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
+ timeInterval = HOUR_TIMEOUT_INTERVAL;
+ } else if (cycleUnit.endsWith(CycleUnitType.MINUTE)) {
+ timeInterval = HOUR_TIMEOUT_INTERVAL;
+ } else {
+ logger.error("cycleUnit {} can't parse!", cycleUnit);
+ timeInterval = DAY_TIMEOUT_INTERVAL;
+ }
+
+ if (timeOffset.startsWith("-")) {
+ timeInterval -= DateTransUtils.calcOffset(timeOffset);
+ } else {
+ timeInterval += DateTransUtils.calcOffset(timeOffset);
+ }
+
+ return isValidCreationTime(dataTime, timeInterval);
+ }
+
+ /*
+ * Check whether the data time is between curTime - interval and curTime +
interval.
+ */
+ public static boolean isValidCreationTime(String dataTime, long
timeInterval) {
+ long currentTime = System.currentTimeMillis();
- for (int i = 0; i < formatLen && length > 0; ++i) {
- if (Character.isLetter(format.charAt(i))
- || Character.isDigit(format.charAt(i))) {
- length--;
+ long minTime = currentTime - timeInterval;
+ long maxTime = currentTime + timeInterval;
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DEFAULT_FORMAT);
+ if (dataTime.length() < DEFAULT_LENGTH) {
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < DEFAULT_LENGTH - dataTime.length(); i++) {
+ sb.append("0");
}
- sb.append(format.charAt(i));
+ dataTime = dataTime + sb.toString();
+ }
+
+ Calendar calendar = Calendar.getInstance();
+ try {
+ calendar.setTimeInMillis(dateFormat.parse(dataTime).getTime());
+ } catch (ParseException e) {
+ return false;
+ }
+
+ return calendar.getTimeInMillis() >= minTime
+ && calendar.getTimeInMillis() <= maxTime;
+ }
+
+ public static String getDateTime(String fileName, PathDateExpression
dateExpression) {
+ if (fileName == null || dateExpression == null
+ || dateExpression.getLongestDatePattern() == null) {
+ return null;
+ }
+
+ String longestDatePattern = DateUtils
+
.replaceDateExpressionWithRegex(dateExpression.getLongestDatePattern());
+ NonRegexPatternPosition patternPosition =
dateExpression.getPatternPosition();
+
+ Matcher mat = Pattern.compile(longestDatePattern).matcher(fileName);
+ boolean find = mat.find();
+ // TODO : more than one part match the time regex in file name
("/data/joox_logs/2000701106/201602170040.log"
+ // YYYYMMDDhh)
+ if (!find) {
+ logger.error("Can't find the pattern {} for file name {}",
longestDatePattern,
+ fileName);
+ return null;
+ }
+
+ String dateTime = fileName.substring(mat.start(), mat.end());
+ if (patternPosition == NonRegexPatternPosition.PREFIX) {
+ dateTime = dateTime.substring(1, dateTime.length());
+ } else if (patternPosition == NonRegexPatternPosition.SUFFIX) {
+ dateTime = dateTime.substring(0, dateTime.length() - 1);
+ } else if (patternPosition == NonRegexPatternPosition.BOTH) {
+ dateTime = dateTime.substring(1, dateTime.length() - 1);
+ } else if (patternPosition == NonRegexPatternPosition.END) {
+ dateTime = dateTime.substring(0, dateTime.length());
+ } else if (patternPosition == NonRegexPatternPosition.ENDSUFFIX) {
+ dateTime = dateTime.substring(1, dateTime.length());
+ } else if (patternPosition == NonRegexPatternPosition.NONE) {
+ logger.error("The data path configuration is invalid");
+ dateTime = null;
+ }
+
+ return dateTime;
+ }
+
+ public static ArrayList<MatchPoint> extractAllTimeRegex(String src) {
+ // TODO : time regex error
+ Matcher m = pattern.matcher(src);
+ ArrayList<MatchPoint> arr = new ArrayList<MatchPoint>();
+ while (m.find()) {
+ String oneMatch = m.group(0);
+ arr.add(new MatchPoint(oneMatch, m.start(), m.end()));
+ }
+ return arr;
+ }
+
+ public static String replaceDateExpressionWithRegex(String dataPath) {
+ if (dataPath == null) {
+ return null;
+ }
+
+ StringBuffer sb = new StringBuffer();
+
+ // find longest DATEPATTERN
+ ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
+
+ if (mp == null || mp.size() == 0) {
+ return dataPath;
+ }
+
+ int lastIndex = 0;
+ for (MatchPoint m : mp) {
+ lastIndex = replacePattern(dataPath, sb, m, "\\d{4}", "\\d{2}",
"\\d{2}", "\\d{2}", "\\d{2}", lastIndex);
}
+
+ sb.append(dataPath.substring(lastIndex));
+
return sb.toString();
}
+ private static int replacePattern(String dataPath, StringBuffer sb,
MatchPoint m, String year, String month,
+ String day, String hour, String minute, int lastIndex) {
+ sb.append(dataPath, lastIndex, m.getStart());
+
+ String longestPattern = m.getStr();
+ int hhIndex = longestPattern.indexOf(HOUR);
+ if (hhIndex == -1) {
+ hhIndex = longestPattern.indexOf(HOUR_LOWERCASE);
+ }
+ int mmIndex = longestPattern.indexOf(MINUTE);
+ longestPattern = longestPattern.replace(YEAR, year);
+ longestPattern = longestPattern.replace(YEAR_LOWERCASE, year);
+ longestPattern = longestPattern.replace(MONTH, month);
+ longestPattern = longestPattern.replace(DAY, day);
+ longestPattern = longestPattern.replace(DAY_LOWERCASE, day);
+ longestPattern = longestPattern.replace(HOUR, hour);
+ longestPattern = longestPattern.replace(HOUR_LOWERCASE, hour);
+
+ if (hhIndex != -1 && mmIndex != -1
+ && mmIndex >= hhIndex + 2 && mmIndex < hhIndex + 4) {
+ longestPattern = longestPattern.replace(MINUTE, minute);
+ }
+
+ sb.append(longestPattern);
+ return m.getEnd();
+ }
+
+ public static String replaceDateExpression(Calendar dateTime, String
dataPath) {
+ if (dataPath == null) {
+ return null;
+ }
+
+ String year = String.valueOf(dateTime.get(Calendar.YEAR));
+ String month = String.valueOf(dateTime.get(Calendar.MONTH) + 1);
+ String day = String.valueOf(dateTime.get(Calendar.DAY_OF_MONTH));
+ String hour = String.valueOf(dateTime.get(Calendar.HOUR_OF_DAY));
+ String minute = String.valueOf(dateTime.get(Calendar.MINUTE));
+
+ StringBuffer sb = new StringBuffer();
+
+ // find longest DATEPATTERN
+ ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
+ if (mp == null || mp.size() == 0) {
+ return dataPath;
+ }
+
+ int lastIndex = 0;
+ for (MatchPoint m : mp) {
+ lastIndex = replacePattern(dataPath, sb, m, year,
externDate(month), externDate(day), externDate(hour),
+ externDate(minute), lastIndex);
+ }
+
+ sb.append(dataPath.substring(lastIndex));
+
+ return sb.toString();
+ }
+
+ public static String externDate(String time) {
+ if (time.length() == 1) {
+ return "0" + time;
+ } else {
+ return time;
+ }
+ }
+
+ public static List<Long> getDateRegion(long startTime, long endTime,
String cycleUnit) {
+ List<Long> ret = new ArrayList<Long>();
+ DateTime dtStart = DateTime.forInstant(startTime,
TimeZone.getDefault());
+ DateTime dtEnd = DateTime.forInstant(endTime, TimeZone.getDefault());
+
+ if (cycleUnit.equals(CycleUnitType.DAY)) {
+ dtEnd = dtEnd.getEndOfDay();
+ }
+
+ int year = 0;
+ int month = 0;
+ int day = 0;
+ int hour = 0;
+ int minute = 0;
+ int second = 0;
+ if (cycleUnit.equalsIgnoreCase(CycleUnitType.DAY)) {
+ day = 1;
+ } else if (cycleUnit.equalsIgnoreCase(CycleUnitType.HOUR)) {
+ hour = 1;
+ } else if (cycleUnit.equals(CycleUnitType.MINUTE)) {
+ minute = 1;
+ } else {
+ logger.error("cycleUnit {} is error: ", cycleUnit);
+ return ret;
+ }
+ while (dtStart.lteq(dtEnd)) {
+ ret.add(dtStart.getMilliseconds(TimeZone.getDefault()));
+ dtStart = dtStart.plus(year, month, day, hour, minute, second, 0,
+ DateTime.DayOverflow.LastDay);
+ }
+ return ret;
+ }
+
// only return the first matched
public static String extractLongestTimeRegex(String src)
throws IllegalArgumentException {
@@ -124,139 +445,4 @@ public class DateUtils {
return ((position == NonRegexPatternPosition.NONE) ? null
: new PathDateExpression(longestPattern, position));
}
-
- public void init(String timeFormat) {
- if (timeFormat != null && !timeFormat.isEmpty()) {
- dateFormat = timeFormat;
- }
- }
-
- // 20120812010203 ---> 2012-08-12 01:02:03
- private String normalizeDateStr(String src) {
- src = src.replaceAll("[^a-zA-Z0-9]", "");
- int len = src.length();
- // if (!isTimeStrValid(src)) {
- // return "";
- // }
- StringBuffer sb = new StringBuffer();
- // year
- sb.append(src.substring(0, 4));
- sb.append("-");
- if (len > 4) {
- // month
- sb.append(src.substring(4, 6));
- if (len > 6) {
- sb.append("-");
- // day
- sb.append(src.substring(6, 8));
- if (len > 8) {
- sb.append(" ");
- // hour
- sb.append(src.substring(8, 10));
- if (len > 10) {
- sb.append(":");
- // minute
- sb.append(src.substring(10, 12));
- if (len > 12) {
- sb.append(":");
- // seconds
- sb.append(src.substring(12, 14));
- } else {
- sb.append(":00");
- }
- } else {
- sb.append(":00:00");
- }
- } else {
- sb.append(" 00:00:00");
- }
- } else {
- sb.append("-01 00:00:00");
- }
- } else {
- sb.append("-01-01 00:00:00");
- }
- return sb.toString();
- }
-
- public String getDate(String src, String limit) {
- if (src == null || src.trim().isEmpty()) {
- return "";
- }
-
- // TODO : verify format str
- int year = 0;
- int month = 0;
- int day = 0;
- int hour = 0;
- int minute = 0;
- int second = 0;
-
- // TODO : timezone
- TimeZone tz = TimeZone.getTimeZone("GMT+8:00");
- DateTime dt = null;
- String outputFormat = null;
- if (src.matches(LETTER_STR)) {
- // format str
- // TODO : data format verify
- dt = DateTime.now(tz);
- outputFormat = src;
- } else {
- // time str
- src = src.replaceAll("[^0-9]", "");
- outputFormat = getSubTimeFormat(dateFormat, src.length());
- src = normalizeDateStr(src);
- if (src.isEmpty()) {
- return "";
- }
- dt = new DateTime(src);
- }
-
- // System.out.println("outputformat: " + outputFormat);
-
- limit = limit.trim();
- String[] limitArr = limit.split(LIMIT_SEP);
-
- for (String onelimit : limitArr) {
- year = 0;
- month = 0;
- day = 0;
- hour = 0;
- minute = 0;
- second = 0;
- // System.out.println("onelimit: " + onelimit);
- int limitLen = onelimit.length();
- String type = onelimit.substring(limitLen - 1, limitLen);
- int offset = Integer.parseInt(onelimit.substring(0, limitLen - 1));
- // System.out.println("type: " + type + ". offset: " + offset);
- int sign = 1;
- if (offset < 0) {
- sign = -1;
- } else {
- sign = 1;
- }
- if (type.equalsIgnoreCase("Y")) {
- year = sign * offset;
- } else if (type.equals("M")) {
- month = sign * offset;
- } else if (type.equalsIgnoreCase("D")) {
- day = sign * offset;
- } else if (type.equalsIgnoreCase("h")) {
- hour = sign * offset;
- } else if (type.equals("m")) {
- minute = sign * offset;
- } else if (type.equalsIgnoreCase("s")) {
- second = sign * offset;
- }
- if (sign < 0) {
- dt = dt.minus(year, month, day, hour, minute, second, 0,
- DateTime.DayOverflow.LastDay);
- } else {
- dt = dt.plus(year, month, day, hour, minute, second, 0,
- DateTime.DayOverflow.LastDay);
- }
-
- }
- return dt.format(outputFormat);
- }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
deleted file mode 100644
index a80f088e0d..0000000000
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.utils.regex;
-
-import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.utils.DateTransUtils;
-
-import hirondelle.date4j.DateTime;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class NewDateUtils {
-
- public static final String FULL_FORMAT = "yyyyMMddHHmmss";
- public static final String NULL_DATA_TIME = "000000000000";
- public static final String DEFAULT_FORMAT = "yyyyMMddHHmm";
- public static final String DEFAULT_TIME_ZONE = "Asia/Shanghai";
- private static final Logger logger =
LoggerFactory.getLogger(NewDateUtils.class);
- private static final String TIME_REGEX =
"YYYY(?:.MM|MM)?(?:.DD|DD)?(?:.hh|hh)?(?:.mm|mm)?(?:"
- + ".ss|ss)?";
- private static final String LIMIT_SEP = "(?<=[a-zA-Z])";
- private static final String LETTER_STR = "\\D+";
- private static final String DIGIT_STR = "[0-9]+";
- private static final Pattern pattern = Pattern.compile(TIME_REGEX,
- Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
- private static final Pattern bracePatt = Pattern.compile("\\{(.*?)\\}");
- private static final int DEFAULT_LENGTH = "yyyyMMddHHmm".length();
- public static long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
- public static long HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
- // data source config error */
- public static final String DATA_SOURCE_CONFIG_ERROR =
"ERROR-0-INLONG_AGENT|10001|ERROR"
- + "|ERROR_DATA_SOURCE_CONFIG|";
-
- /* Return the time in milliseconds for a data time. */
- /*
- * public static long getTimeInMillis(String dataTime) { if (dataTime ==
null) { return 0; }
- *
- * try { SimpleDateFormat dateFormat = new
SimpleDateFormat(DEFAULT_FORMAT); Date date = dateFormat.parse(dataTime);
- *
- * return date.getTime(); } catch (ParseException e) { return 0; } }
- */
- /* Return the format data time string from milliseconds. */
- /*
- * public static String getDataTimeFromTimeMillis(long dataTime) {
SimpleDateFormat dateFormat = new
- * SimpleDateFormat(DEFAULT_FORMAT); return dateFormat.format(new
Date(dataTime)); }
- */
- /* Return the should start time for a data time log file. */
- public static String getShouldStartTime(String dataTime, String cycleUnit,
- String offset) {
- if (dataTime == null || dataTime.length() > 12) {
- return null;
- }
-
- SimpleDateFormat dateFormat = new SimpleDateFormat(DEFAULT_FORMAT);
- TimeZone timeZone =
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
- dateFormat.setTimeZone(timeZone);
-
- if (dataTime.length() < DEFAULT_LENGTH) {
- StringBuffer sb = new StringBuffer();
- for (int i = 0; i < DEFAULT_LENGTH - dataTime.length(); i++) {
- sb.append("0");
- }
- dataTime = dataTime + sb.toString();
- }
-
- Calendar calendar = Calendar.getInstance();
- try {
- calendar.setTimeInMillis(dateFormat.parse(dataTime).getTime());
- } catch (ParseException e) {
- return null;
- }
-
- /*
- * The delay should be added to the data time, so remove the - from
offset.
- */
- if (offset.startsWith("-")) {
- offset = offset.substring(1, offset.length());
- } else { // positiveļ¼read file earlier
- offset = "-" + offset;
- }
-
- return dateFormat
- .format(new Date(getDateTime(calendar, cycleUnit,
offset).getTimeInMillis()));
- }
-
- private static Calendar getDateTime(Calendar calendar, String cycleUnit,
String offset) {
- int cycleNumber = (cycleUnit.length() <= 1
- ? 1
- : Integer.parseInt(cycleUnit.substring(0, cycleUnit.length() -
1)));
-
- String offsetUnit = offset.substring(offset.length() - 1,
offset.length());
- int offsetNumber = Integer.parseInt(offset.substring(0,
offset.length() - 1));
-
- /*
- * For day task, the offset cycle unit can only be day; for hourly
task, the offset can't be minute; for
- * minutely task, the offset cycle unit can be day, hour and minute,
but if the offset cycle unit is minute, the
- * offset must be divided by cycle number.
- */
- if (cycleUnit.length() > 1 &&
(StringUtils.endsWithIgnoreCase(cycleUnit, "M"))) {
- calendar.set(Calendar.SECOND, 0);
- int minTime = calendar.get(Calendar.MINUTE);
-
- int leftMin = minTime % cycleNumber;
- minTime = minTime - leftMin;
- calendar.set(Calendar.MINUTE, minTime);
-
- /* Calculate the offset. */
- if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) {
- calendar.add(Calendar.DAY_OF_YEAR, offsetNumber);
- }
-
- if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) {
- calendar.add(Calendar.HOUR_OF_DAY, offsetNumber);
- }
- } else if (cycleUnit.length() == 1) {
- if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
- calendar.set(Calendar.HOUR_OF_DAY, 0);
- calendar.set(Calendar.MINUTE, 0);
- calendar.set(Calendar.SECOND, 0);
- } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
- calendar.set(Calendar.MINUTE, 0);
- calendar.set(Calendar.SECOND, 0);
- }
- }
-
- /* Calculate the offset. */
- if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) {
- calendar.add(Calendar.DAY_OF_YEAR, offsetNumber);
- }
-
- if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) {
- calendar.add(Calendar.HOUR_OF_DAY, offsetNumber);
- }
-
- if (CycleUnitType.MINUTE.equals(offsetUnit)) {
- calendar.add(Calendar.MINUTE, offsetNumber);
- }
-
- return calendar;
- }
-
- public static boolean isValidCreationTime(String dataTime, String
cycleUnit,
- String timeOffset) {
- long timeInterval = 0;
- if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
- timeInterval = DAY_TIMEOUT_INTERVAL;
- } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
- timeInterval = HOUR_TIMEOUT_INTERVAL;
- } else if (cycleUnit.endsWith(CycleUnitType.MINUTE)) {
- timeInterval = HOUR_TIMEOUT_INTERVAL;
- } else {
- logger.error("cycleUnit {} can't parse!", cycleUnit);
- timeInterval = DAY_TIMEOUT_INTERVAL;
- }
-
- if (timeOffset.startsWith("-")) {
- timeInterval -= DateTransUtils.calcOffset(timeOffset);
- } else {
- timeInterval += DateTransUtils.calcOffset(timeOffset);
- }
-
- return isValidCreationTime(dataTime, timeInterval);
- }
-
- /*
- * Check whether the data time is between curTime - interval and curTime +
interval.
- */
- public static boolean isValidCreationTime(String dataTime, long
timeInterval) {
- long currentTime = System.currentTimeMillis();
-
- long minTime = currentTime - timeInterval;
- long maxTime = currentTime + timeInterval;
-
- SimpleDateFormat dateFormat = new SimpleDateFormat(DEFAULT_FORMAT);
- if (dataTime.length() < DEFAULT_LENGTH) {
- StringBuffer sb = new StringBuffer();
- for (int i = 0; i < DEFAULT_LENGTH - dataTime.length(); i++) {
- sb.append("0");
- }
- dataTime = dataTime + sb.toString();
- }
-
- Calendar calendar = Calendar.getInstance();
- try {
- calendar.setTimeInMillis(dateFormat.parse(dataTime).getTime());
- } catch (ParseException e) {
- return false;
- }
-
- return calendar.getTimeInMillis() >= minTime
- && calendar.getTimeInMillis() <= maxTime;
- }
-
- public static boolean isBraceContain(String dataName) {
- Matcher matcher = bracePatt.matcher(dataName);
- return matcher.find();
- }
-
- public static String getDateTime(String fileName, String dataName,
PathDateExpression dateExpression) {
- String dataTime = null;
-
- if (isBraceContain(dataName)) {
- String fullRegx = replaceDateExpressionWithRegex(dataName,
"dataTime");
- Pattern fullPattern = Pattern.compile(fullRegx);
- Matcher matcher = fullPattern.matcher(fileName);
- if (matcher.find()) {
- dataTime = matcher.group("dataTime");
- }
- } else {
- dataTime = getDateTime(fileName, dateExpression);
- }
- return dataTime;
- }
-
- public static String getDateTime(String fileName, PathDateExpression
dateExpression) {
- if (fileName == null || dateExpression == null
- || dateExpression.getLongestDatePattern() == null) {
- return null;
- }
-
- String longestDatePattern = NewDateUtils
-
.replaceDateExpressionWithRegex(dateExpression.getLongestDatePattern());
- NonRegexPatternPosition patternPosition =
dateExpression.getPatternPosition();
-
- Matcher mat = Pattern.compile(longestDatePattern).matcher(fileName);
- boolean find = mat.find();
- // TODO : more than one part match the time regex in file name
("/data/joox_logs/2000701106/201602170040.log"
- // YYYYMMDDhh)
- if (!find) {
- logger.error("Can't find the pattern {} for file name {}",
longestDatePattern,
- fileName);
- return null;
- }
-
- String dateTime = fileName.substring(mat.start(), mat.end());
- if (patternPosition == NonRegexPatternPosition.PREFIX) {
- dateTime = dateTime.substring(1, dateTime.length());
- } else if (patternPosition == NonRegexPatternPosition.SUFFIX) {
- dateTime = dateTime.substring(0, dateTime.length() - 1);
- } else if (patternPosition == NonRegexPatternPosition.BOTH) {
- dateTime = dateTime.substring(1, dateTime.length() - 1);
- } else if (patternPosition == NonRegexPatternPosition.END) {
- dateTime = dateTime.substring(0, dateTime.length());
- } else if (patternPosition == NonRegexPatternPosition.ENDSUFFIX) {
- dateTime = dateTime.substring(1, dateTime.length());
- } else if (patternPosition == NonRegexPatternPosition.NONE) {
- logger.error("The data path configuration is invalid");
- dateTime = null;
- }
-
- return dateTime;
- }
-
- public static ArrayList<MatchPoint> extractAllTimeRegex(String src) {
- // TODO : time regex error
- Matcher m = pattern.matcher(src);
- ArrayList<MatchPoint> arr = new ArrayList<MatchPoint>();
- while (m.find()) {
- String oneMatch = m.group(0);
- arr.add(new MatchPoint(oneMatch, m.start(), m.end()));
- }
- return arr;
- }
-
- public static String replaceDateExpressionWithRegex(String dataPath) {
- if (dataPath == null) {
- return null;
- }
- StringBuffer sb = new StringBuffer();
-
- // find longest DATEPATTERN
- ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
-
- if (mp == null || mp.size() == 0) {
- return dataPath;
- }
-
- int lastIndex = 0;
- for (MatchPoint m : mp) {
- sb.append(dataPath.substring(lastIndex, m.getStart()));
-
- String longestPattern = m.getStr();
- int hhIndex = longestPattern.indexOf("hh");
- int mmIndex = longestPattern.indexOf("mm");
- longestPattern = longestPattern.replace("YYYY", "\\d{4}");
- longestPattern = longestPattern.replace("MM", "\\d{2}");
- longestPattern = longestPattern.replace("DD", "\\d{2}");
- longestPattern = longestPattern.replace("hh", "\\d{2}");
-
- if (hhIndex != -1 && mmIndex != -1
- && mmIndex >= hhIndex + 2 && mmIndex < hhIndex + 4) {
- longestPattern = longestPattern.replace("mm", "\\d{2}");
- }
- sb.append(longestPattern);
- lastIndex = m.getEnd();
- }
-
- sb.append(dataPath.substring(lastIndex));
-
- return sb.toString();
- }
-
- public static String replaceDateExpressionWithRegex(String dataPath,
String dateTimeGroupName) {
- if (dataPath == null) {
- return null;
- }
-
- // \\d{4}\\d{2}\\d{2}\\d{2} --> (?<GroupName>\\d{4}\\d{2}\\d{2}\\d{2})
- if (isBraceContain(dataPath)) {
- StringBuilder sb = new StringBuilder();
- sb.append(dataPath.substring(0, dataPath.indexOf('{')));
- sb.append("(?<").append(dateTimeGroupName).append('>');
- sb.append(dataPath.substring(dataPath.indexOf('{') + 1,
dataPath.indexOf('}')));
- sb.append(')').append(dataPath.substring(dataPath.indexOf('}') +
1));
- dataPath = sb.toString();
- }
-
- StringBuffer sb = new StringBuffer();
-
- // find longest DATEPATTERN
- ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
-
- if (mp == null || mp.size() == 0) {
- return dataPath;
- }
-
- int lastIndex = 0;
- for (int i = 0; i < mp.size(); i++) {
- MatchPoint m = mp.get(i);
- sb.append(dataPath.substring(lastIndex, m.getStart()));
-
- String longestPattern = m.getStr();
- int hhIndex = longestPattern.indexOf("hh");
- int mmIndex = longestPattern.indexOf("mm");
- longestPattern = longestPattern.replace("YYYY", "\\d{4}");
- longestPattern = longestPattern.replace("MM", "\\d{2}");
- longestPattern = longestPattern.replace("DD", "\\d{2}");
- longestPattern = longestPattern.replace("hh", "\\d{2}");
-
- if (hhIndex != -1 && mmIndex != -1
- && mmIndex >= hhIndex + 2 && mmIndex < hhIndex + 4) {
- longestPattern = longestPattern.replace("mm", "\\d{2}");
- }
-
- sb.append(longestPattern);
- lastIndex = m.getEnd();
- }
-
- sb.append(dataPath.substring(lastIndex));
-
- return sb.toString();
- }
-
- public static String replaceDateExpression(Calendar dateTime, String
dataPath) {
- if (dataPath == null) {
- return null;
- }
-
- String year = String.valueOf(dateTime.get(Calendar.YEAR));
- String month = String.valueOf(dateTime.get(Calendar.MONTH) + 1);
- String day = String.valueOf(dateTime.get(Calendar.DAY_OF_MONTH));
- String hour = String.valueOf(dateTime.get(Calendar.HOUR_OF_DAY));
- String minute = String.valueOf(dateTime.get(Calendar.MINUTE));
-
- StringBuffer sb = new StringBuffer();
-
- // find longest DATEPATTERN
- ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
- if (mp == null || mp.size() == 0) {
- return dataPath;
- }
-
- int lastIndex = 0;
- for (MatchPoint m : mp) {
- sb.append(dataPath.substring(lastIndex, m.getStart()));
-
- String longestPattern = m.getStr();
- int hhIndex = longestPattern.indexOf("hh");
- int mmIndex = longestPattern.indexOf("mm");
-
- longestPattern = longestPattern.replaceAll("YYYY", year);
- longestPattern = longestPattern.replaceAll("MM",
externDate(month));
- longestPattern = longestPattern.replaceAll("DD", externDate(day));
- longestPattern = longestPattern.replaceAll("hh", externDate(hour));
-
- if (hhIndex != -1 && mmIndex != -1 && mmIndex >= hhIndex + 2
- && mmIndex < hhIndex + 4) {
- longestPattern = longestPattern.replaceAll("mm",
externDate(minute));
- }
-
- sb.append(longestPattern);
- lastIndex = m.getEnd();
- }
-
- sb.append(dataPath.substring(lastIndex));
-
- return sb.toString();
- }
-
- public static String externDate(String time) {
- if (time.length() == 1) {
- return "0" + time;
- } else {
- return time;
- }
- }
-
- public static List<Long> getDateRegion(long startTime, long endTime,
String cycleUnit) {
- List<Long> ret = new ArrayList<Long>();
- DateTime dtStart = DateTime.forInstant(startTime,
TimeZone.getDefault());
- DateTime dtEnd = DateTime.forInstant(endTime, TimeZone.getDefault());
-
- if (cycleUnit.equals(CycleUnitType.DAY)) {
- dtEnd = dtEnd.getEndOfDay();
- }
-
- int year = 0;
- int month = 0;
- int day = 0;
- int hour = 0;
- int minute = 0;
- int second = 0;
- if (cycleUnit.equalsIgnoreCase(CycleUnitType.DAY)) {
- day = 1;
- } else if (cycleUnit.equalsIgnoreCase(CycleUnitType.HOUR)) {
- hour = 1;
- } else if (cycleUnit.equals(CycleUnitType.MINUTE)) {
- minute = 1;
- } else {
- logger.error("cycleUnit {} is error: ", cycleUnit);
- return ret;
- }
- while (dtStart.lteq(dtEnd)) {
- ret.add(dtStart.getMilliseconds(TimeZone.getDefault()));
- dtStart = dtStart.plus(year, month, day, hour, minute, second, 0,
- DateTime.DayOverflow.LastDay);
- }
- return ret;
- }
-}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
index b3153e81fe..ecddccce9f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
@@ -59,13 +59,13 @@ public class Scanner {
String strStartTime =
DateTransUtils.millSecConvertToTimeStr(range.startTime, cycleUnit);
String strEndTime =
DateTransUtils.millSecConvertToTimeStr(range.endTime, cycleUnit);
LOGGER.info("{} scan time is between {} and {}", originPattern,
strStartTime, strEndTime);
- List<Long> dateRegion = NewDateUtils.getDateRegion(range.startTime,
range.endTime, cycleUnit);
+ List<Long> dateRegion = DateUtils.getDateRegion(range.startTime,
range.endTime, cycleUnit);
List<FinalPatternInfo> finalPatternList = new ArrayList<>();
for (Long time : dateRegion) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
FinalPatternInfo finalPatternInfo = new FinalPatternInfo(
- NewDateUtils.replaceDateExpression(calendar,
originPattern), time);
+ DateUtils.replaceDateExpression(calendar, originPattern),
time);
finalPatternList.add(finalPatternInfo);
}
return finalPatternList;
@@ -75,7 +75,7 @@ public class Scanner {
boolean isRetry) {
TimeRange range = getTimeRange(startTime, endTime, cycleUnit,
timeOffset, isRetry);
List<String> dataTimeList = new ArrayList<>();
- List<Long> dateRegion = NewDateUtils.getDateRegion(range.startTime,
range.endTime, cycleUnit);
+ List<Long> dateRegion = DateUtils.getDateRegion(range.startTime,
range.endTime, cycleUnit);
for (Long time : dateRegion) {
String dataTime = DateTransUtils.millSecConvertToTimeStr(time,
cycleUnit);
dataTimeList.add(dataTime);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 214a29b24e..2506191ca6 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.FetcherConstants;
import org.apache.inlong.agent.pojo.COSTask.COSTaskConfig;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
+import org.apache.inlong.agent.pojo.SQLTask.SQLTaskConfig;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
@@ -164,4 +165,36 @@ public class AgentBaseTestsHelper {
return dataConfig;
}
+ public TaskProfile getSQLTaskProfile(int taskId, String sql, String
contentStyle, boolean retry,
+ String startTime, String endTime, TaskStateEnum state, String
cycleUnit, String timeZone) {
+ DataConfig dataConfig = getSQLDataConfig(taskId, sql, contentStyle,
retry, startTime, endTime,
+ state, cycleUnit, timeZone);
+ TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
+ return profile;
+ }
+
+ private DataConfig getSQLDataConfig(int taskId, String sql, String
contentStyle, boolean retry,
+ String startTime, String endTime, TaskStateEnum state, String
cycleUnit, String timeZone) {
+ DataConfig dataConfig = new DataConfig();
+ dataConfig.setInlongGroupId("testGroupId");
+ dataConfig.setInlongStreamId("testStreamId");
+ dataConfig.setDataReportType(1);
+ dataConfig.setTaskType(TaskTypeEnum.SQL.getType());
+ dataConfig.setTaskId(taskId);
+ dataConfig.setTimeZone(timeZone);
+ dataConfig.setState(state.ordinal());
+ SQLTaskConfig sqlTaskConfig = new SQLTaskConfig();
+ sqlTaskConfig.setUsername("testUserName");
+ sqlTaskConfig.setJdbcPassword("testPassword");
+ sqlTaskConfig.setSql(sql);
+ sqlTaskConfig.setTimeOffset("0d");
+ // GMT-8:00 same with Asia/Shanghai
+ sqlTaskConfig.setMaxInstanceCount(100);
+ sqlTaskConfig.setCycleUnit(cycleUnit);
+ sqlTaskConfig.setRetry(retry);
+ sqlTaskConfig.setDataTimeFrom(startTime);
+ sqlTaskConfig.setDataTimeTo(endTime);
+ dataConfig.setExtParams(GSON.toJson(sqlTaskConfig));
+ return dataConfig;
+ }
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
index 4d3bc0508d..8facaa32ec 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
@@ -30,6 +30,7 @@ import com.qcloud.cos.COSClient;
import com.qcloud.cos.model.COSObjectSummary;
import com.qcloud.cos.model.ListObjectsRequest;
import com.qcloud.cos.model.ObjectListing;
+import com.qcloud.cos.model.ObjectMetadata;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Date;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -74,6 +76,11 @@ public class TestCOSTask {
helper = new
AgentBaseTestsHelper(TestCOSTask.class.getName()).setupAgentHome();
manager = new TaskManager();
cosClient = Mockito.mock(COSClient.class);
+ when(cosClient.getObjectMetadata(Mockito.anyString(),
Mockito.anyString())).thenAnswer(mock -> {
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setLastModified(new Date());
+ return metadata;
+ });
PowerMockito.mockStatic(COSUtils.class);
Mockito.when(COSUtils.createCli(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString()))
.thenReturn(cosClient);
@@ -88,7 +95,8 @@ public class TestCOSTask {
ObjectListing objectListing1_1 = Mockito.mock(ObjectListing.class);
when(objectListing1_1.getCommonPrefixes()).thenReturn(
Arrays.asList("some/20230928_0/", "some/20230928_1/",
"some/20230928_aaa/"));
-
when(objectListing1_1.getObjectSummaries()).thenReturn(getSummaries(Arrays.asList("some/20230928_test_0.txt")));
+ when(objectListing1_1.getObjectSummaries()).thenReturn(
+ getSummaries(Arrays.asList("some/20230928_test_0.txt")));
ObjectListing objectListing1_2 = Mockito.mock(ObjectListing.class);
when(objectListing1_2.getCommonPrefixes()).thenReturn(
@@ -185,13 +193,27 @@ public class TestCOSTask {
public void testScan() {
mockDay(cosClient);
doTest(1, "some/YYYYMMDD_[0-9]+/test_[0-9]+.txt", CycleUnitType.DAY,
- Arrays.asList("some/20230928_0/test_0.txt",
"some/20230928_0/test_1.txt", "some/20230929_1/test_0.txt",
+ Arrays.asList("some/20230928_0/test_0.txt",
"some/20230928_0/test_1.txt",
+ "some/20230929_1/test_0.txt",
+ "some/20230929_1/test_1.txt"),
+ Arrays.asList("20230928", "20230928", "20230929", "20230929"),
+ "20230928",
+ "20230930");
+ doTest(2, "some/yyyyMMdd_[0-9]+/test_[0-9]+.txt", CycleUnitType.DAY,
+ Arrays.asList("some/20230928_0/test_0.txt",
"some/20230928_0/test_1.txt",
+ "some/20230929_1/test_0.txt",
"some/20230929_1/test_1.txt"),
Arrays.asList("20230928", "20230928", "20230929", "20230929"),
"20230928",
"20230930");
mockHour(cosClient);
- doTest(2, "some/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt", CycleUnitType.HOUR,
+ doTest(3, "some/YYYYMMDDHH_[0-9]+/test_[0-9]+.txt", CycleUnitType.HOUR,
+ Arrays.asList("some/2023092800_0/test_0.txt",
"some/2023092800_0/test_1.txt",
+ "some/2023092901_1/test_0.txt",
+ "some/2023092901_1/test_1.txt"),
+ Arrays.asList("2023092800", "2023092800", "2023092901",
"2023092901"), "2023092800",
+ "2023093023");
+ doTest(4, "some/yyyyMMddhh_[0-9]+/test_[0-9]+.txt", CycleUnitType.HOUR,
Arrays.asList("some/2023092800_0/test_0.txt",
"some/2023092800_0/test_1.txt",
"some/2023092901_1/test_0.txt",
"some/2023092901_1/test_1.txt"),
@@ -213,7 +235,8 @@ public class TestCOSTask {
fileName.add(invocation.getArgument(0));
dataTime.add(invocation.getArgument(1));
return null;
- }).when(fileTask, "addToEvenMap", Mockito.anyString(),
Mockito.anyString());
+ }).when(fileTask, "addToEvenMap", Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(),
+ Mockito.anyString());
Assert.assertTrue(fileTask.isProfileValid(taskProfile));
manager.getTaskStore().storeTask(taskProfile);
fileTask.init(manager, taskProfile,
manager.getInstanceBasicStore());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestFileTask.java
similarity index 78%
rename from
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
rename to
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestFileTask.java
index d2c8e8a342..f5eda88e10 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestFileTask.java
@@ -51,10 +51,10 @@ import static org.awaitility.Awaitility.await;
@RunWith(PowerMockRunner.class)
@PrepareForTest(FileTask.class)
@PowerMockIgnore({"javax.management.*"})
-public class TestLogFileTask {
+public class TestFileTask {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TestLogFileTask.class);
- private static final ClassLoader LOADER =
TestLogFileTask.class.getClassLoader();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestFileTask.class);
+ private static final ClassLoader LOADER =
TestFileTask.class.getClassLoader();
private static AgentBaseTestsHelper helper;
private static TaskManager manager;
private static String resourceParentPath;
@@ -66,7 +66,7 @@ public class TestLogFileTask {
@BeforeClass
public static void setup() throws Exception {
- helper = new
AgentBaseTestsHelper(TestLogFileTask.class.getName()).setupAgentHome();
+ helper = new
AgentBaseTestsHelper(TestFileTask.class.getName()).setupAgentHome();
resourceParentPath = new
File(LOADER.getResource("testScan/temp.txt").getPath()).getParent();
manager = new TaskManager();
}
@@ -82,14 +82,31 @@ public class TestLogFileTask {
resourceParentPath + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt",
CycleUnitType.DAY, Arrays.asList("20230928"),
"20230928", "20230930");
doTest(2, Arrays.asList("testScan/2023092810_1/test_1.txt"),
- resourceParentPath + "/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt",
+ resourceParentPath + "/YYYYMMDDHH_[0-9]+/test_[0-9]+.txt",
CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023092800",
"2023093023");
doTest(3, Arrays.asList("testScan/202309281030_1/test_1.txt",
"testScan/202309301059_1/test_1.txt"),
- resourceParentPath + "/YYYYMMDDhhmm_[0-9]+/test_[0-9]+.txt",
+ resourceParentPath + "/YYYYMMDDHHmm_[0-9]+/test_[0-9]+.txt",
CycleUnitType.MINUTE, Arrays.asList("202309281030",
"202309301059"), "202309280000",
"202309302300");
doTest(4, Arrays.asList("testScan/20241030/23/59.txt"),
- resourceParentPath + "/YYYYMMDD/hh/mm.txt",
+ resourceParentPath + "/YYYYMMDD/HH/mm.txt",
+ CycleUnitType.MINUTE, Arrays.asList("202410302359"),
"202410300000", "202410310000");
+ }
+
+ @Test
+ public void testScanLowercase() throws Exception {
+ doTest(1, Arrays.asList("testScan/20230928_1/test_1.txt"),
+ resourceParentPath + "/yyyyMMdd_[0-9]+/test_[0-9]+.txt",
CycleUnitType.DAY, Arrays.asList("20230928"),
+ "20230928", "20230930");
+ doTest(2, Arrays.asList("testScan/2023092810_1/test_1.txt"),
+ resourceParentPath + "/yyyyMMddhh_[0-9]+/test_[0-9]+.txt",
+ CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023092800",
"2023093023");
+ doTest(3, Arrays.asList("testScan/202309281030_1/test_1.txt",
"testScan/202309301059_1/test_1.txt"),
+ resourceParentPath + "/yyyyMMddhhmm_[0-9]+/test_[0-9]+.txt",
+ CycleUnitType.MINUTE, Arrays.asList("202309281030",
"202309301059"), "202309280000",
+ "202309302300");
+ doTest(4, Arrays.asList("testScan/20241030/23/59.txt"),
+ resourceParentPath + "/yyyyMMdd/hh/mm.txt",
CycleUnitType.MINUTE, Arrays.asList("202410302359"),
"202410300000", "202410310000");
}
@@ -112,7 +129,8 @@ public class TestLogFileTask {
fileName.add(invocation.getArgument(0));
dataTime.add(invocation.getArgument(1));
return null;
- }).when(dayTask, "addToEvenMap", Mockito.anyString(),
Mockito.anyString());
+ }).when(dayTask, "addToEvenMap", Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(),
+ Mockito.anyString());
Assert.assertTrue(dayTask.isProfileValid(taskProfile));
manager.getTaskStore().storeTask(taskProfile);
dayTask.init(manager, taskProfile,
manager.getInstanceBasicStore());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index a1f13122e3..9dd8ea88e0 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -17,7 +17,7 @@
package org.apache.inlong.agent.plugin.utils;
-import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.metric.MetricRegister;
@@ -125,7 +125,7 @@ public class TestUtils {
Calendar calendar = Calendar.getInstance();
Long dataTime = DateTransUtils.timeStrConvertToMillSec("202406251007",
"m");
calendar.setTimeInMillis(dataTime);
- Assert.assertEquals(NewDateUtils.replaceDateExpression(calendar, src),
dst);
+ Assert.assertEquals(DateUtils.replaceDateExpression(calendar, src),
dst);
}
private void testCutDirectoryByWildcard(String src, List<String> dst) {