This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4d597adee0 [INLONG-11591][Agent] Reduce duplicate code for log
collection type tasks (#11592)
4d597adee0 is described below
commit 4d597adee042f3154f7ad416cb6af6a3f06020ce
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Dec 10 18:08:08 2024 +0800
[INLONG-11591][Agent] Reduce duplicate code for log collection type tasks
(#11592)
* [INLONG-11591][Agent] Reduce duplicate code for log collection type tasks
* Update
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
Co-authored-by: AloysZhang <[email protected]>
* [INLONG-11591][Agent] Delete useless code
---------
Co-authored-by: AloysZhang <[email protected]>
---
.../apache/inlong/agent/conf/InstanceProfile.java | 49 +++++-
.../org/apache/inlong/agent/conf/TaskProfile.java | 58 +++++--
.../inlong/agent/constant/TaskConstants.java | 7 +-
.../apache/inlong/agent/pojo/TaskProfileDto.java | 23 +--
.../agent/core/instance/InstanceManager.java | 2 +-
.../inlong/agent/plugin/instance/FileInstance.java | 2 +-
.../inlong/agent/plugin/sources/LogFileSource.java | 2 +-
.../agent/plugin/task/FormatDateLogFileTask.java | 27 ---
.../apache/inlong/agent/plugin/task/KafkaTask.java | 5 +-
.../inlong/agent/plugin/task/MongoDBTask.java | 5 +-
.../apache/inlong/agent/plugin/task/MqttTask.java | 6 +-
.../inlong/agent/plugin/task/OracleTask.java | 9 +-
.../inlong/agent/plugin/task/PostgreSQLTask.java | 5 +-
.../inlong/agent/plugin/task/PulsarTask.java | 5 +-
.../apache/inlong/agent/plugin/task/RedisTask.java | 5 +-
.../inlong/agent/plugin/task/SQLServerTask.java | 5 +-
.../plugin/task/logcollection/LogAbstractTask.java | 185 +++++++++++++++++++++
.../task/{ => logcollection}/cos/COSTask.java | 166 ++----------------
.../task/{ => logcollection}/cos/FileScanner.java | 2 +-
.../local}/FileDataUtils.java | 2 +-
.../{file => logcollection/local}/FileScanner.java | 2 +-
.../local/FileTask.java} | 164 ++----------------
.../local}/FileTimeComparator.java | 2 +-
.../task/{file => logcollection/local}/Files.java | 2 +-
.../{file => logcollection/local}/WatchEntity.java | 2 +-
.../agent/plugin/instance/TestInstanceManager.java | 33 +++-
.../inlong/agent/plugin/sinks/KafkaSinkTest.java | 4 +-
.../inlong/agent/plugin/sinks/PulsarSinkTest.java | 4 +-
.../sinks/filecollect/TestSenderManager.java | 6 +-
.../agent/plugin/sources/TestLogFileSource.java | 6 +-
.../agent/plugin/sources/TestRedisSource.java | 4 +-
.../agent/plugin/sources/TestSQLServerSource.java | 4 +-
.../inlong/agent/plugin/task/TestCOSTask.java | 16 +-
.../inlong/agent/plugin/task/TestLogFileTask.java | 8 +-
.../inlong/agent/plugin/task/TestTaskManager.java | 8 +-
35 files changed, 389 insertions(+), 446 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index 9e85872ff5..c3b20a0581 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.conf;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.utils.file.FileUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
@@ -40,12 +41,24 @@ import static
org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;
/**
* job profile which contains details describing properties of one job.
*/
public class InstanceProfile extends AbstractConfiguration implements
Comparable<InstanceProfile> {
+ public static final String DEFAULT_FILE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.FileInstance";
+ public static final String DEFAULT_COS_INSTANCE =
"org.apache.inlong.agent.plugin.instance.COSInstance";
+ public static final String DEFAULT_KAFKA_INSTANCE =
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
+ public static final String DEFAULT_MONGODB_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
+ public static final String DEFAULT_MQTT_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MqttInstance";
+ public static final String DEFAULT_ORACLE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.OracleInstance";
+ public static final String DEFAULT_POSTGRES_INSTANCE =
"org.apache.inlong.agent.plugin.instance.PostgreSQLInstance";
+ public static final String DEFAULT_PULSAR_INSTANCE =
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
+ public static final String DEFAULT_REDIS_INSTANCE =
"org.apache.inlong.agent.plugin.instance.RedisInstance";
+ public static final String DEFAULT_SQLSERVER_INSTANCE =
"org.apache.inlong.agent.plugin.instance.SQLServerInstance";
+
private static final Logger LOGGER =
LoggerFactory.getLogger(InstanceProfile.class);
private static final Gson GSON = new Gson();
@@ -64,12 +77,40 @@ public class InstanceProfile extends AbstractConfiguration
implements Comparable
return GSON.toJson(getConfigStorage());
}
- public void setInstanceClass(String className) {
- set(TaskConstants.INSTANCE_CLASS, className);
+ public String getInstanceClass() {
+ TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE,
TaskTypeEnum.FILE.getType()));
+ return getInstanceClassByTaskType(taskType);
}
- public String getInstanceClass() {
- return get(TaskConstants.INSTANCE_CLASS);
+ public static String getInstanceClassByTaskType(TaskTypeEnum taskType) {
+ if (taskType == null) {
+ return null;
+ }
+ switch (taskType) {
+ case FILE:
+ return DEFAULT_FILE_INSTANCE;
+ case KAFKA:
+ return DEFAULT_KAFKA_INSTANCE;
+ case PULSAR:
+ return DEFAULT_PULSAR_INSTANCE;
+ case POSTGRES:
+ return DEFAULT_POSTGRES_INSTANCE;
+ case ORACLE:
+ return DEFAULT_ORACLE_INSTANCE;
+ case SQLSERVER:
+ return DEFAULT_SQLSERVER_INSTANCE;
+ case MONGODB:
+ return DEFAULT_MONGODB_INSTANCE;
+ case REDIS:
+ return DEFAULT_REDIS_INSTANCE;
+ case MQTT:
+ return DEFAULT_MQTT_INSTANCE;
+ case COS:
+ return DEFAULT_COS_INSTANCE;
+ default:
+ LOGGER.error("invalid task type {}", taskType);
+ return null;
+ }
}
public String getTaskId() {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index 8c509e4240..c2a60a0598 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
import com.google.gson.Gson;
@@ -32,18 +33,32 @@ import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.TimeZone;
+import static java.util.Objects.requireNonNull;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;
/**
* job profile which contains details describing properties of one job.
*/
public class TaskProfile extends AbstractConfiguration {
+ public static final String DEFAULT_FILE_TASK =
"org.apache.inlong.agent.plugin.task.logcollection.local.FileTask";
+ public static final String DEFAULT_COS_TASK =
"org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask";
+ public static final String DEFAULT_KAFKA_TASK =
"org.apache.inlong.agent.plugin.task.KafkaTask";
+ public static final String DEFAULT_PULSAR_TASK =
"org.apache.inlong.agent.plugin.task.PulsarTask";
+ public static final String DEFAULT_MONGODB_TASK =
"org.apache.inlong.agent.plugin.task.MongoDBTask";
+ public static final String DEFAULT_ORACLE_TASK =
"org.apache.inlong.agent.plugin.task.OracleTask";
+ public static final String DEFAULT_REDIS_TASK =
"org.apache.inlong.agent.plugin.task.RedisTask";
+ public static final String DEFAULT_POSTGRESQL_TASK =
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
+ public static final String DEFAULT_MQTT_TASK =
"org.apache.inlong.agent.plugin.task.MqttTask";
+ public static final String DEFAULT_SQLSERVER_TASK =
"org.apache.inlong.agent.plugin.task.SQLServerTask";
+ public static final String DEFAULT_MOCK_TASK =
"org.apache.inlong.agent.plugin.task.MockTask";
+
private static final Gson GSON = new Gson();
private static final Logger logger =
LoggerFactory.getLogger(TaskProfile.class);
@@ -57,6 +72,37 @@ public class TaskProfile extends AbstractConfiguration {
return TaskProfileDto.convertToTaskProfile(dataConfig);
}
+ public String getTaskClass() {
+ TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE,
TaskTypeEnum.FILE.getType()));
+ switch (requireNonNull(taskType)) {
+ case FILE:
+ return DEFAULT_FILE_TASK;
+ case KAFKA:
+ return DEFAULT_KAFKA_TASK;
+ case PULSAR:
+ return DEFAULT_PULSAR_TASK;
+ case POSTGRES:
+ return DEFAULT_POSTGRESQL_TASK;
+ case ORACLE:
+ return DEFAULT_ORACLE_TASK;
+ case SQLSERVER:
+ return DEFAULT_SQLSERVER_TASK;
+ case MONGODB:
+ return DEFAULT_MONGODB_TASK;
+ case REDIS:
+ return DEFAULT_REDIS_TASK;
+ case MQTT:
+ return DEFAULT_MQTT_TASK;
+ case COS:
+ return DEFAULT_COS_TASK;
+ case MOCK:
+ return DEFAULT_MOCK_TASK;
+ default:
+ logger.error("invalid task type {}", taskType);
+ return null;
+ }
+ }
+
public String getTaskId() {
return get(TaskConstants.TASK_ID);
}
@@ -81,14 +127,6 @@ public class TaskProfile extends AbstractConfiguration {
return getBoolean(TASK_RETRY, false);
}
- public String getTaskClass() {
- return get(TaskConstants.TASK_CLASS);
- }
-
- public void setTaskClass(String className) {
- set(TaskConstants.TASK_CLASS, className);
- }
-
public String getInlongGroupId() {
return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
}
@@ -124,11 +162,9 @@ public class TaskProfile extends AbstractConfiguration {
return GSON.toJson(getConfigStorage());
}
- public InstanceProfile createInstanceProfile(String instanceClass, String
fileName, String cycleUnit,
- String dataTime,
+ public InstanceProfile createInstanceProfile(String fileName, String
cycleUnit, String dataTime,
long fileUpdateTime) {
InstanceProfile instanceProfile =
InstanceProfile.parseJsonStr(toJsonStr());
- instanceProfile.setInstanceClass(instanceClass);
instanceProfile.setInstanceId(fileName);
instanceProfile.setSourceDataTime(dataTime);
Long sinkDataTime = 0L;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index a4f9156577..201ec24713 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -22,11 +22,8 @@ package org.apache.inlong.agent.constant;
*/
public class TaskConstants extends CommonConstants {
- // job id
- // public static final String JOB_ID = "job.id";
public static final String TASK_ID = "task.id";
public static final String INSTANCE_ID = "instance.id";
- public static final String JOB_INSTANCE_ID = "job.instance.id";
public static final String INSTANCE_CREATE_TIME = "instance.createTime";
public static final String INSTANCE_MODIFY_TIME = "instance.modifyTime";
public static final String TASK_GROUP_ID = "task.groupId";
@@ -36,9 +33,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_SOURCE = "task.source";
public static final String TASK_CHANNEL = "task.channel";
-
- public static final String TASK_CLASS = "task.taskClass";
- public static final String INSTANCE_CLASS = "task.instance.class";
+ public static final String TASK_TYPE = "task.taskType";
public static final String TASK_FILE_TRIGGER = "task.fileTask.trigger";
// sink config
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index a9134de3a8..a30bf44ecc 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -52,22 +52,11 @@ import static
org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_D
public class TaskProfileDto {
private static final Logger logger =
LoggerFactory.getLogger(TaskProfileDto.class);
-
- public static final String DEFAULT_FILE_TASK =
"org.apache.inlong.agent.plugin.task.file.LogFileTask";
- public static final String DEFAULT_KAFKA_TASK =
"org.apache.inlong.agent.plugin.task.KafkaTask";
- public static final String DEFAULT_PULSAR_TASK =
"org.apache.inlong.agent.plugin.task.PulsarTask";
- public static final String DEFAULT_MONGODB_TASK =
"org.apache.inlong.agent.plugin.task.MongoDBTask";
- public static final String DEFAULT_ORACLE_TASK =
"org.apache.inlong.agent.plugin.task.OracleTask";
- public static final String DEFAULT_REDIS_TASK =
"org.apache.inlong.agent.plugin.task.RedisTask";
- public static final String DEFAULT_POSTGRESQL_TASK =
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
- public static final String DEFAULT_MQTT_TASK =
"org.apache.inlong.agent.plugin.task.MqttTask";
- public static final String DEFAULT_SQLSERVER_TASK =
"org.apache.inlong.agent.plugin.task.SQLServerTask";
public static final String DEFAULT_CHANNEL =
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK =
"org.apache.inlong.agent.plugin.sinks.ProxySink";
public static final String PULSAR_SINK =
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
public static final String KAFKA_SINK =
"org.apache.inlong.agent.plugin.sinks.KafkaSink";
- public static final String DEFAULT_COS_TASK =
"org.apache.inlong.agent.plugin.task.cos.COSTask";
/**
* file source
*/
@@ -470,6 +459,7 @@ public class TaskProfileDto {
// common attribute
task.setId(String.valueOf(dataConfig.getTaskId()));
+ task.setTaskType(dataConfig.getTaskType());
task.setGroupId(dataConfig.getInlongGroupId());
task.setStreamId(dataConfig.getInlongStreamId());
task.setChannel(DEFAULT_CHANNEL);
@@ -517,7 +507,6 @@ public class TaskProfileDto {
profileDto.setTask(task);
break;
case FILE:
- task.setTaskClass(DEFAULT_FILE_TASK);
FileTask fileTask = getFileTask(dataConfig);
task.setCycleUnit(fileTask.getCycleUnit());
task.setFileTask(fileTask);
@@ -526,56 +515,48 @@ public class TaskProfileDto {
profileDto.setTask(task);
break;
case KAFKA:
- task.setTaskClass(DEFAULT_KAFKA_TASK);
KafkaTask kafkaTask = getKafkaTask(dataConfig);
task.setKafkaTask(kafkaTask);
task.setSource(KAFKA_SOURCE);
profileDto.setTask(task);
break;
case PULSAR:
- task.setTaskClass(DEFAULT_PULSAR_TASK);
PulsarTask pulsarTask = getPulsarTask(dataConfig);
task.setPulsarTask(pulsarTask);
task.setSource(PULSAR_SOURCE);
profileDto.setTask(task);
break;
case POSTGRES:
- task.setTaskClass(DEFAULT_POSTGRESQL_TASK);
PostgreSQLTask postgreSQLTask = getPostgresTask(dataConfig);
task.setPostgreSQLTask(postgreSQLTask);
task.setSource(POSTGRESQL_SOURCE);
profileDto.setTask(task);
break;
case ORACLE:
- task.setTaskClass(DEFAULT_ORACLE_TASK);
OracleTask oracleTask = getOracleTask(dataConfig);
task.setOracleTask(oracleTask);
task.setSource(ORACLE_SOURCE);
profileDto.setTask(task);
break;
case SQLSERVER:
- task.setTaskClass(DEFAULT_SQLSERVER_TASK);
SqlServerTask sqlserverTask = getSqlServerTask(dataConfig);
task.setSqlserverTask(sqlserverTask);
task.setSource(SQLSERVER_SOURCE);
profileDto.setTask(task);
break;
case MONGODB:
- task.setTaskClass(DEFAULT_MONGODB_TASK);
MongoTask mongoTask = getMongoTask(dataConfig);
task.setMongoTask(mongoTask);
task.setSource(MONGO_SOURCE);
profileDto.setTask(task);
break;
case REDIS:
- task.setTaskClass(DEFAULT_REDIS_TASK);
RedisTask redisTask = getRedisTask(dataConfig);
task.setRedisTask(redisTask);
task.setSource(REDIS_SOURCE);
profileDto.setTask(task);
break;
case MQTT:
- task.setTaskClass(DEFAULT_MQTT_TASK);
MqttTask mqttTask = getMqttTask(dataConfig);
task.setMqttTask(mqttTask);
task.setSource(MQTT_SOURCE);
@@ -585,7 +566,6 @@ public class TaskProfileDto {
profileDto.setTask(task);
break;
case COS:
- task.setTaskClass(DEFAULT_COS_TASK);
COSTask cosTask = getCOSTask(dataConfig);
task.setCycleUnit(cosTask.getCycleUnit());
task.setCosTask(cosTask);
@@ -619,6 +599,7 @@ public class TaskProfileDto {
private String mqClusters;
private String topicInfo;
private String taskClass;
+ private Integer taskType;
private String predefinedFields;
private Integer state;
private String cycleUnit;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 1b9dba8191..3396c3a591 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -453,7 +453,7 @@ public class InstanceManager extends AbstractDaemon {
instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
}
} catch (Throwable t) {
- LOGGER.error("add instance error {}", t.getMessage());
+ LOGGER.error("add instance error.", t);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 38cb969376..3a4225abdf 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -19,7 +19,7 @@ package org.apache.inlong.agent.plugin.instance;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
import java.io.IOException;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index ecaa53196b..6774751a34 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -28,7 +28,7 @@ import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import lombok.AllArgsConstructor;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileTask.java
deleted file mode 100644
index 697fad2861..0000000000
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileTask.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.task;
-
-import org.apache.inlong.agent.plugin.task.file.LogFileTask;
-
-/**
- * Directory trigger with format date.
- */
-public class FormatDateLogFileTask extends LogFileTask {
-
-}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
index 86b06079ab..79d5534e5c 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
@@ -35,7 +35,6 @@ import static
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_TOPIC;
public class KafkaTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaTask.class);
- public static final String DEFAULT_KAFKA_INSTANCE =
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
private boolean isAdded = false;
private String topic;
private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
@@ -58,8 +57,8 @@ public class KafkaTask extends AbstractTask {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
- CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(topic, CycleUnitType.HOUR, dataTime,
+ AgentUtils.getCurrentTime());
LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
list.add(instanceProfile);
this.isAdded = true;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
index 4a3ef07e26..bc1bcdc649 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
@@ -34,7 +34,6 @@ import java.util.List;
public class MongoDBTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(MongoDBTask.class);
- public static final String DEFAULT_MONGODB_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
private boolean isAdded = false;
private String collection;
private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
@@ -66,8 +65,8 @@ public class MongoDBTask extends AbstractTask {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_MONGODB_INSTANCE, collection,
- CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(collection, CycleUnitType.HOUR, dataTime,
+ AgentUtils.getCurrentTime());
LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
list.add(instanceProfile);
this.isAdded = true;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
index 1d7d9a3dc2..50de90a315 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
@@ -42,8 +42,6 @@ public class MqttTask extends AbstractTask {
private AtomicBoolean isAdded = new AtomicBoolean(false);
- public static final String DEFAULT_MQTT_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MqttInstance";
-
private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
@Override
@@ -93,8 +91,8 @@ public class MqttTask extends AbstractTask {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_MQTT_INSTANCE, topic,
- CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(topic, CycleUnitType.HOUR, dataTime,
+ AgentUtils.getCurrentTime());
LOGGER.info("taskProfile.createInstanceProfile(mqtt): {}",
instanceProfile.toJsonStr());
list.add(instanceProfile);
isAdded.set(true);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
index 34b064d48d..4db4c0da21 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
@@ -32,13 +32,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.inlong.agent.constant.TaskConstants.*;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_DBNAME;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_TABLE_INCLUDE_LIST;
public class OracleTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(OracleTask.class);
- public static final String DEFAULT_ORACLE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.OracleInstance";
private AtomicBoolean isAdded = new AtomicBoolean(false);
private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
@@ -103,9 +103,8 @@ public class OracleTask extends AbstractTask {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
- InstanceProfile instanceProfile =
- taskProfile.createInstanceProfile(DEFAULT_ORACLE_INSTANCE,
instanceId,
- CycleUnitType.HOUR, dataTime,
AgentUtils.getCurrentTime());
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(instanceId, CycleUnitType.HOUR, dataTime,
+ AgentUtils.getCurrentTime());
LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
list.add(instanceProfile);
this.isAdded.set(true);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
index 7cf382fbd6..db675c7e9f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
@@ -41,7 +41,6 @@ import static
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_USER;
public class PostgreSQLTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PostgreSQLTask.class);
- public static final String DEFAULT_KAFKA_INSTANCE =
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
private boolean isAdded = false;
public static final int DEFAULT_INSTANCE_LIMIT = 1;
@@ -98,8 +97,8 @@ public class PostgreSQLTask extends AbstractTask {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, instanceId,
- CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(instanceId, CycleUnitType.HOUR, dataTime,
+ AgentUtils.getCurrentTime());
list.add(instanceProfile);
this.isAdded = true;
return list;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
index 4586da7bb1..89da06bd1b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
@@ -37,7 +37,6 @@ import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_TOPIC;
public class PulsarTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarTask.class);
- public static final String DEFAULT_PULSAR_INSTANCE =
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
private boolean isAdded = false;
private String tenant;
private String namespace;
@@ -75,8 +74,8 @@ public class PulsarTask extends AbstractTask {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_PULSAR_INSTANCE, instanceId,
- CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(instanceId, CycleUnitType.HOUR, dataTime,
+ AgentUtils.getCurrentTime());
LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
list.add(instanceProfile);
this.isAdded = true;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
index b9f7449ecd..e36d5b02ee 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
@@ -34,7 +34,6 @@ import java.util.List;
public class RedisTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(RedisTask.class);
- public static final String DEFAULT_REDIS_INSTANCE =
"org.apache.inlong.agent.plugin.instance.RedisInstance";
private boolean isAdded = false;
private String taskId;
@@ -68,8 +67,8 @@ public class RedisTask extends AbstractTask {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_REDIS_INSTANCE, taskId,
- CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(taskId, CycleUnitType.HOUR, dataTime,
+ AgentUtils.getCurrentTime());
LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
list.add(instanceProfile);
this.isAdded = true;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
index dd1446b301..f892c80e33 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
@@ -34,7 +34,6 @@ import java.util.List;
public class SQLServerTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(SQLServerTask.class);
- public static final String DEFAULT_SQLSERVER_INSTANCE =
"org.apache.inlong.agent.plugin.instance.SQLServerInstance";
private boolean isAdded = false;
private String dbName;
@@ -102,8 +101,8 @@ public class SQLServerTask extends AbstractTask {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_SQLSERVER_INSTANCE, instanceId,
- CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(instanceId, CycleUnitType.HOUR, dataTime,
+ AgentUtils.getCurrentTime());
list.add(instanceProfile);
this.isAdded = true;
return list;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
new file mode 100644
index 0000000000..e234042524
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.logcollection;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.core.task.TaskAction;
+import org.apache.inlong.agent.plugin.task.AbstractTask;
+import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
+import org.apache.inlong.agent.state.State;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public abstract class LogAbstractTask extends AbstractTask {
+
+ private static final int INSTANCE_QUEUE_CAPACITY = 10;
+ public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LogAbstractTask.class);
+ protected boolean retry;
+ protected BlockingQueue<InstanceProfile> instanceQueue;
+ private volatile boolean runAtLeastOneTime = false;
+ protected long startTime;
+ protected long endTime;
+ protected String timeOffset = "";
+ protected final Map<String/* dataTime */, Map<String/* fileName */,
InstanceProfile>> eventMap =
+ new ConcurrentHashMap<>();
+
+ @Override
+ protected void initTask() {
+ instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
+ }
+
+ @Override
+ protected List<InstanceProfile> getNewInstanceList() {
+ if (retry) {
+ runForRetry();
+ } else {
+ runForNormal();
+ }
+ List<InstanceProfile> list = new ArrayList<>();
+ while (list.size() < INSTANCE_QUEUE_CAPACITY &&
!instanceQueue.isEmpty()) {
+ InstanceProfile profile = instanceQueue.poll();
+ if (profile != null) {
+ list.add(profile);
+ }
+ }
+ return list;
+ }
+
+ abstract protected void runForNormal();
+
+ abstract protected void dealWithEventMap();
+
+ abstract protected void scanExistingFile();
+
+ private void runForRetry() {
+ if (!runAtLeastOneTime) {
+ scanExistingFile();
+ runAtLeastOneTime = true;
+ }
+ dealWithEventMap();
+ if (allInstanceFinished()) {
+ LOGGER.info("retry task finished, send action to task manager,
taskId {}", getTaskId());
+ TaskAction action = new
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
+ taskManager.submitAction(action);
+ doChangeState(State.SUCCEEDED);
+ }
+ }
+
+ protected void dealWithEventMapWithCycle() {
+ long startScanTime = startTime;
+ long endScanTime = endTime;
+ List<String> dataTimeList = Scanner.getDataTimeList(startScanTime,
endScanTime, taskProfile.getCycleUnit(),
+ timeOffset, retry);
+ if (dataTimeList.isEmpty()) {
+ LOGGER.error("getDataTimeList get empty list");
+ return;
+ }
+ Set<String> dealtDataTime = new HashSet<>();
+ // normal task first handle current data time
+ if (!retry) {
+ String current = dataTimeList.remove(dataTimeList.size() - 1);
+ dealtDataTime.add(current);
+ if (!dealEventMapByDataTime(current, true)) {
+ return;
+ }
+ }
+ dataTimeList.forEach(dataTime -> {
+ dealtDataTime.add(dataTime);
+ if (!dealEventMapByDataTime(dataTime, false)) {
+ return;
+ }
+ });
+ for (String dataTime : eventMap.keySet()) {
+ if (!dealtDataTime.contains(dataTime)) {
+ dealEventMapByDataTime(dataTime, false);
+ }
+ }
+ }
+
+ protected boolean dealEventMapByDataTime(String dataTime, boolean
isCurrentDataTime) {
+ Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.get(dataTime);
+ if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) {
+ return true;
+ }
+ if (shouldStartNow(dataTime)) {
+ Set<InstanceProfile> sortedEvents = new
TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId));
+ sortedEvents.addAll(sameDataTimeEvents.values());
+ for (InstanceProfile sortEvent : sortedEvents) {
+ String fileName = sortEvent.getInstanceId();
+ InstanceProfile profile = sameDataTimeEvents.get(fileName);
+ if (!isCurrentDataTime && isFull()) {
+ return false;
+ }
+ if (!instanceQueue.offer(profile)) {
+ return false;
+ }
+ sameDataTimeEvents.remove(fileName);
+ }
+ }
+ return true;
+ }
+
+ /*
+ * Calculate whether the event needs to be processed at the current time
based on its data time, business cycle, and
+ * offset
+ */
+ private boolean shouldStartNow(String dataTime) {
+ String shouldStartTime = NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), timeOffset);
+ String currentTime = getCurrentTime();
+ return currentTime.compareTo(shouldStartTime) >= 0;
+ }
+
+ private String getCurrentTime() {
+ SimpleDateFormat dateFormat = new
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
+ TimeZone timeZone =
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
+ dateFormat.setTimeZone(timeZone);
+ return dateFormat.format(new Date(System.currentTimeMillis()));
+ }
+
+ protected void removeTimeoutEvent(Map<String, Map<String,
InstanceProfile>> eventMap, boolean isRetry) {
+ if (isRetry) {
+ return;
+ }
+ for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
+ /* If the data time of the event is within 2 days before (after)
the current time, it is valid */
+ String dataTime = entry.getKey();
+ if (!NewDateUtils.isValidCreationTime(dataTime,
DAY_TIMEOUT_INTERVAL)) {
+ /* Remove it from memory map. */
+ eventMap.remove(dataTime);
+ LOGGER.warn("remove too old event from event map. dataTime
{}", dataTime);
+ }
+ }
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
similarity index 54%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
index de2beb7cc0..86210157ed 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
@@ -15,18 +15,14 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.task.cos;
+package org.apache.inlong.agent.plugin.task.logcollection.cos;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.task.TaskAction;
-import org.apache.inlong.agent.plugin.task.AbstractTask;
-import org.apache.inlong.agent.plugin.task.cos.FileScanner.BasicFileInfo;
+import org.apache.inlong.agent.plugin.task.logcollection.LogAbstractTask;
+import
org.apache.inlong.agent.plugin.task.logcollection.cos.FileScanner.BasicFileInfo;
import org.apache.inlong.agent.plugin.utils.cos.COSUtils;
-import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
-import org.apache.inlong.agent.plugin.utils.regex.Scanner;
-import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
@@ -36,45 +32,24 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
/**
* Watch directory, if new valid files are created, create instance
correspondingly.
*/
-public class COSTask extends AbstractTask {
+public class COSTask extends LogAbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(COSTask.class);
- public static final String DEFAULT_COS_INSTANCE =
"org.apache.inlong.agent.plugin.instance.COSInstance";
- private static final int INSTANCE_QUEUE_CAPACITY = 10;
- private final Map<String/* dataTime */, Map<String/* fileName */,
InstanceProfile>> eventMap =
- new ConcurrentHashMap<>();
- public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
- private boolean retry;
- private long startTime;
- private long endTime;
private String originPattern;
private long lastScanTime = 0;
public final long SCAN_INTERVAL = 1 * 60 * 1000;
- private volatile boolean runAtLeastOneTime = false;
- private BlockingQueue<InstanceProfile> instanceQueue;
private COSClient cosClient;
private String bucketName;
private String secretId;
private String secretKey;
private String strRegion;
- private String timeOffset = "";
@Override
protected int getInstanceLimit() {
@@ -83,8 +58,8 @@ public class COSTask extends AbstractTask {
@Override
protected void initTask() {
+ super.initTask();
timeOffset = taskProfile.get(TaskConstants.TASK_COS_TIME_OFFSET, "");
- instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
retry = taskProfile.getBoolean(TaskConstants.COS_TASK_RETRY, false);
originPattern = taskProfile.get(TaskConstants.COS_TASK_PATTERN);
bucketName = taskProfile.get(TaskConstants.COS_TASK_BUCKET_NAME);
@@ -110,23 +85,6 @@ public class COSTask extends AbstractTask {
return true;
}
- @Override
- protected List<InstanceProfile> getNewInstanceList() {
- if (retry) {
- runForRetry();
- } else {
- runForNormal();
- }
- List<InstanceProfile> list = new ArrayList<>();
- while (list.size() < INSTANCE_QUEUE_CAPACITY &&
!instanceQueue.isEmpty()) {
- InstanceProfile profile = instanceQueue.poll();
- if (profile != null) {
- list.add(profile);
- }
- }
- return list;
- }
-
@Override
public boolean isProfileValid(TaskProfile profile) {
if (!profile.allRequiredKeyExist()) {
@@ -173,21 +131,8 @@ public class COSTask extends AbstractTask {
cosClient.shutdown();
}
- private void runForRetry() {
- if (!runAtLeastOneTime) {
- scanExistingFile();
- runAtLeastOneTime = true;
- }
- dealWithEventMap();
- if (allInstanceFinished()) {
- LOGGER.info("retry task finished, send action to task manager,
taskId {}", getTaskId());
- TaskAction action = new
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
- taskManager.submitAction(action);
- doChangeState(State.SUCCEEDED);
- }
- }
-
- private void runForNormal() {
+ @Override
+ protected void runForNormal() {
if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) {
scanExistingFile();
lastScanTime = AgentUtils.getCurrentTime();
@@ -195,7 +140,8 @@ public class COSTask extends AbstractTask {
dealWithEventMap();
}
- private void scanExistingFile() {
+ @Override
+ protected void scanExistingFile() {
List<BasicFileInfo> fileInfos =
FileScanner.scanTaskBetweenTimes(cosClient, bucketName, originPattern,
taskProfile.getCycleUnit(), timeOffset, startTime, endTime,
retry);
LOGGER.info("taskId {} scan {} get file count {}", getTaskId(),
originPattern, fileInfos.size());
@@ -215,98 +161,12 @@ public class COSTask extends AbstractTask {
return fileToProfile.get(fileName) != null;
}
- private void dealWithEventMap() {
+ @Override
+ protected void dealWithEventMap() {
removeTimeoutEvent(eventMap, retry);
dealWithEventMapWithCycle();
}
- private void dealWithEventMapWithCycle() {
- long startScanTime = startTime;
- long endScanTime = endTime;
- List<String> dataTimeList = Scanner.getDataTimeList(startScanTime,
endScanTime, taskProfile.getCycleUnit(),
- timeOffset, retry);
- if (dataTimeList.isEmpty()) {
- LOGGER.error("get dataTimeList return empty list");
- return;
- }
- Set<String> dealtDataTime = new HashSet<>();
- // normal task first handle current data time
- if (!retry) {
- String current = dataTimeList.remove(dataTimeList.size() - 1);
- dealtDataTime.add(current);
- if (!dealEventMapByDataTime(current, true)) {
- return;
- }
- }
- dataTimeList.forEach(dataTime -> {
- dealtDataTime.add(dataTime);
- if (!dealEventMapByDataTime(dataTime, false)) {
- return;
- }
- });
- for (String dataTime : eventMap.keySet()) {
- if (!dealtDataTime.contains(dataTime)) {
- dealEventMapByDataTime(dataTime, false);
- }
- }
- }
-
- private boolean dealEventMapByDataTime(String dataTime, boolean
isCurrentDataTime) {
- Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.get(dataTime);
- if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) {
- return true;
- }
- if (shouldStartNow(dataTime)) {
- Set<InstanceProfile> sortedEvents = new
TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId));
- sortedEvents.addAll(sameDataTimeEvents.values());
- for (InstanceProfile sortEvent : sortedEvents) {
- String fileName = sortEvent.getInstanceId();
- InstanceProfile profile = sameDataTimeEvents.get(fileName);
- if (!isCurrentDataTime && isFull()) {
- return false;
- }
- if (!instanceQueue.offer(profile)) {
- return false;
- }
- sameDataTimeEvents.remove(fileName);
- }
- }
- return true;
- }
-
- /*
- * Calculate whether the event needs to be processed at the current time
based on its data time, business cycle, and
- * offset
- */
- private boolean shouldStartNow(String dataTime) {
- String shouldStartTime =
- NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), timeOffset);
- String currentTime = getCurrentTime();
- return currentTime.compareTo(shouldStartTime) >= 0;
- }
-
- private void removeTimeoutEvent(Map<String, Map<String, InstanceProfile>>
eventMap, boolean isRetry) {
- if (isRetry) {
- return;
- }
- for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
- /* If the data time of the event is within 2 days before (after)
the current time, it is valid */
- String dataTime = entry.getKey();
- if (!NewDateUtils.isValidCreationTime(dataTime,
DAY_TIMEOUT_INTERVAL)) {
- /* Remove it from memory map. */
- eventMap.remove(dataTime);
- LOGGER.warn("remove too old event from event map. dataTime
{}", dataTime);
- }
- }
- }
-
- private String getCurrentTime() {
- SimpleDateFormat dateFormat = new
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
- TimeZone timeZone =
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
- dateFormat.setTimeZone(timeZone);
- return dateFormat.format(new Date(System.currentTimeMillis()));
- }
-
private void addToEvenMap(String fileName, String dataTime) {
if (isInEventMap(fileName, dataTime)) {
LOGGER.info("add to evenMap isInEventMap returns true skip taskId
{} dataTime {} fileName {}",
@@ -328,8 +188,8 @@ public class COSTask extends AbstractTask {
return;
}
String cycleUnit = taskProfile.getCycleUnit();
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_COS_INSTANCE,
- fileName, cycleUnit, dataTime, fileUpdateTime);
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(fileName, cycleUnit, dataTime,
+ fileUpdateTime);
sameDataTimeEvents.put(fileName, instanceProfile);
LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/FileScanner.java
similarity index 99%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/FileScanner.java
index 1019d34135..e3185722dd 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/FileScanner.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.task.cos;
+package org.apache.inlong.agent.plugin.task.logcollection.cos;
import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
import org.apache.inlong.agent.plugin.utils.regex.Scanner;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileDataUtils.java
similarity index 95%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileDataUtils.java
index d65960ab17..5848ad563d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileDataUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
import java.io.IOException;
import java.nio.file.Files;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileScanner.java
similarity index 98%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileScanner.java
index 7fa759a9d2..70003d65ab 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileScanner.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
import org.apache.inlong.agent.plugin.utils.regex.Scanner;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
similarity index 73%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
index 4eb3b69525..ef50d1fc62 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.task.TaskAction;
-import org.apache.inlong.agent.plugin.task.AbstractTask;
-import org.apache.inlong.agent.plugin.task.file.FileScanner.BasicFileInfo;
+import org.apache.inlong.agent.plugin.task.logcollection.LogAbstractTask;
+import
org.apache.inlong.agent.plugin.task.logcollection.local.FileScanner.BasicFileInfo;
import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression;
import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
-import org.apache.inlong.agent.plugin.utils.regex.Scanner;
-import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.agent.utils.file.FileUtils;
@@ -47,19 +44,12 @@ import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -67,28 +57,17 @@ import java.util.stream.Stream;
/**
* Watch directory, if new valid files are created, create instance
correspondingly.
*/
-public class LogFileTask extends AbstractTask {
+public class FileTask extends LogAbstractTask {
- private static final Logger LOGGER =
LoggerFactory.getLogger(LogFileTask.class);
- public static final String DEFAULT_FILE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.FileInstance";
- private static final int INSTANCE_QUEUE_CAPACITY = 10;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileTask.class);
private final Map<String, WatchEntity> watchers = new
ConcurrentHashMap<>();
private final Set<String> watchFailedDirs = new HashSet<>();
- private final Map<String/* dataTime */, Map<String/* fileName */,
InstanceProfile>> eventMap =
- new ConcurrentHashMap<>();
- public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
public static final int CORE_THREAD_MAX_GAP_TIME_MS = 60 * 1000;
- private boolean retry;
- private volatile long startTime;
- private volatile long endTime;
private boolean realTime = false;
private Set<String> originPatterns;
private long lastScanTime = 0;
public final long SCAN_INTERVAL = 1 * 60 * 1000;
- private volatile boolean runAtLeastOneTime = false;
private volatile long coreThreadUpdateTime = 0;
- private String timeOffset = "";
- private BlockingQueue<InstanceProfile> instanceQueue;
@Override
protected int getInstanceLimit() {
@@ -97,8 +76,8 @@ public class LogFileTask extends AbstractTask {
@Override
protected void initTask() {
+ super.initTask();
timeOffset = taskProfile.get(TaskConstants.TASK_FILE_TIME_OFFSET, "");
- instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
retry = taskProfile.isRetry();
originPatterns =
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
.collect(Collectors.toSet());
@@ -125,23 +104,6 @@ public class LogFileTask extends AbstractTask {
return true;
}
- @Override
- protected List<InstanceProfile> getNewInstanceList() {
- if (retry) {
- runForRetry();
- } else {
- runForNormal();
- }
- List<InstanceProfile> list = new ArrayList<>();
- while (list.size() < INSTANCE_QUEUE_CAPACITY &&
!instanceQueue.isEmpty()) {
- InstanceProfile profile = instanceQueue.poll();
- if (profile != null) {
- list.add(profile);
- }
- }
- return list;
- }
-
@Override
public boolean isProfileValid(TaskProfile profile) {
if (!profile.allRequiredKeyExist()) {
@@ -244,21 +206,8 @@ public class LogFileTask extends AbstractTask {
});
}
- private void runForRetry() {
- if (!runAtLeastOneTime) {
- scanExistingFile();
- runAtLeastOneTime = true;
- }
- dealWithEventMap();
- if (allInstanceFinished()) {
- LOGGER.info("retry task finished, send action to task manager,
taskId {}", getTaskId());
- TaskAction action = new
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
- taskManager.submitAction(action);
- doChangeState(State.SUCCEEDED);
- }
- }
-
- private void runForNormal() {
+ @Override
+ protected void runForNormal() {
if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) {
scanExistingFile();
lastScanTime = AgentUtils.getCurrentTime();
@@ -267,7 +216,8 @@ public class LogFileTask extends AbstractTask {
dealWithEventMap();
}
- private void scanExistingFile() {
+ @Override
+ protected void scanExistingFile() {
originPatterns.forEach((originPattern) -> {
List<BasicFileInfo> fileInfos =
scanExistingFileByPattern(originPattern);
LOGGER.info("taskId {} scan {} get file count {}", getTaskId(),
originPattern, fileInfos.size());
@@ -316,7 +266,8 @@ public class LogFileTask extends AbstractTask {
}
}
- private void dealWithEventMap() {
+ @Override
+ protected void dealWithEventMap() {
removeTimeoutEvent(eventMap, retry);
if (realTime) {
dealWithEventMapRealTime();
@@ -325,99 +276,12 @@ public class LogFileTask extends AbstractTask {
}
}
- private void dealWithEventMapWithCycle() {
- long startScanTime = startTime;
- long endScanTime = endTime;
- List<String> dataTimeList = Scanner.getDataTimeList(startScanTime,
endScanTime, taskProfile.getCycleUnit(),
- timeOffset, retry);
- if (dataTimeList.isEmpty()) {
- LOGGER.error("getDataTimeList get empty list");
- return;
- }
- Set<String> dealtDataTime = new HashSet<>();
- // normal task first handle current data time
- if (!retry) {
- String current = dataTimeList.remove(dataTimeList.size() - 1);
- dealtDataTime.add(current);
- if (!dealEventMapByDataTime(current, true)) {
- return;
- }
- }
- dataTimeList.forEach(dataTime -> {
- dealtDataTime.add(dataTime);
- if (!dealEventMapByDataTime(dataTime, false)) {
- return;
- }
- });
- for (String dataTime : eventMap.keySet()) {
- if (!dealtDataTime.contains(dataTime)) {
- dealEventMapByDataTime(dataTime, false);
- }
- }
- }
-
private void dealWithEventMapRealTime() {
for (String dataTime : eventMap.keySet()) {
dealEventMapByDataTime(dataTime, true);
}
}
- private boolean dealEventMapByDataTime(String dataTime, boolean
isCurrentDataTime) {
- Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.get(dataTime);
- if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) {
- return true;
- }
- if (realTime || shouldStartNow(dataTime)) {
- Set<InstanceProfile> sortedEvents = new
TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId));
- sortedEvents.addAll(sameDataTimeEvents.values());
- for (InstanceProfile sortEvent : sortedEvents) {
- String fileName = sortEvent.getInstanceId();
- InstanceProfile profile = sameDataTimeEvents.get(fileName);
- if (!isCurrentDataTime && isFull()) {
- return false;
- }
- if (!instanceQueue.offer(profile)) {
- return false;
- }
- sameDataTimeEvents.remove(fileName);
- }
- }
- return true;
- }
-
- /*
- * Calculate whether the event needs to be processed at the current time
based on its data time, business cycle, and
- * offset
- */
- private boolean shouldStartNow(String dataTime) {
- String shouldStartTime =
- NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), timeOffset);
- String currentTime = getCurrentTime();
- return currentTime.compareTo(shouldStartTime) >= 0;
- }
-
- private void removeTimeoutEvent(Map<String, Map<String, InstanceProfile>>
eventMap, boolean isRetry) {
- if (isRetry || realTime) {
- return;
- }
- for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
- /* If the data time of the event is within 2 days before (after)
the current time, it is valid */
- String dataTime = entry.getKey();
- if (!NewDateUtils.isValidCreationTime(dataTime,
DAY_TIMEOUT_INTERVAL)) {
- /* Remove it from memory map. */
- eventMap.remove(dataTime);
- LOGGER.warn("remove too old event from event map. dataTime
{}", dataTime);
- }
- }
- }
-
- private String getCurrentTime() {
- SimpleDateFormat dateFormat = new
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
- TimeZone timeZone =
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
- dateFormat.setTimeZone(timeZone);
- return dateFormat.format(new Date(System.currentTimeMillis()));
- }
-
public synchronized void dealWithWatchEntity(String originPattern) {
WatchEntity entity = watchers.get(originPattern);
if (entity == null) {
@@ -520,8 +384,8 @@ public class LogFileTask extends AbstractTask {
} else {
cycleUnit = taskProfile.getCycleUnit();
}
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE,
- fileName, cycleUnit, dataTime, fileUpdateTime);
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(fileName, cycleUnit, dataTime,
+ fileUpdateTime);
sameDataTimeEvents.put(fileName, instanceProfile);
LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTimeComparator.java
similarity index 94%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTimeComparator.java
index 1fbbde3b56..aef1c9fd9e 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTimeComparator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
import java.io.File;
import java.util.Comparator;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/Files.java
similarity index 97%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/Files.java
index 3c3bca9d59..12b085226d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/Files.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
import org.apache.inlong.agent.utils.file.FileFinder;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
similarity index 99%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
index b33eaff818..47d052e81b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
index 9901f29023..4340bd6176 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
@@ -38,6 +38,12 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +53,9 @@ import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(InstanceProfile.class)
+@PowerMockIgnore({"javax.management.*"})
public class TestInstanceManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(TestInstanceManager.class);
@@ -56,6 +65,15 @@ public class TestInstanceManager {
@BeforeClass
public static void setup() {
+ PowerMockito.mockStatic(InstanceProfile.class);
+ Mockito.when(InstanceProfile.getInstanceClassByTaskType(Mockito.any()))
+ .thenAnswer(mock -> MockInstance.class.getCanonicalName());
+
Mockito.when(InstanceProfile.parseJsonStr(Mockito.anyString())).thenAnswer(mock
-> {
+ String jsonStr = mock.getArgument(0);
+ InstanceProfile conf = new InstanceProfile();
+ conf.loadJsonStrResource(jsonStr);
+ return conf;
+ });
helper = new
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt";
Store basicInstanceStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
@@ -79,9 +97,8 @@ public class TestInstanceManager {
public void testInstanceManager() {
InstanceStore instanceStore = manager.getInstanceStore();
for (int i = 1; i <= 10; i++) {
- InstanceProfile profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
- String.valueOf(i), taskProfile.getCycleUnit(),
"2023092710",
- AgentUtils.getCurrentTime());
+ InstanceProfile profile =
taskProfile.createInstanceProfile(String.valueOf(i), taskProfile.getCycleUnit(),
+ "2023092710", AgentUtils.getCurrentTime());
instanceStore.storeInstance(profile);
}
manager.start();
@@ -91,9 +108,8 @@ public class TestInstanceManager {
Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() ==
InstanceStateEnum.DEFAULT);
}
long timeBefore = AgentUtils.getCurrentTime();
- InstanceProfile profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
- helper.getTestRootDir() + "/2023092710_1.txt",
taskProfile.getCycleUnit(), "2023092710",
- AgentUtils.getCurrentTime());
+ InstanceProfile profile =
taskProfile.createInstanceProfile(helper.getTestRootDir() + "/2023092710_1.txt",
+ taskProfile.getCycleUnit(), "2023092710",
AgentUtils.getCurrentTime());
String sinkDataTime = String.valueOf(profile.getSinkDataTime());
try {
String add2TimeZone = String.valueOf(
@@ -122,9 +138,8 @@ public class TestInstanceManager {
Assert.assertTrue(manager.shouldAddAgain(profile.getInstanceId(),
AgentUtils.getCurrentTime()));
// test continue
- profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
- helper.getTestRootDir() + "/2023092710_1.txt",
taskProfile.getCycleUnit(), "2023092710",
- AgentUtils.getCurrentTime());
+ profile = taskProfile.createInstanceProfile(helper.getTestRootDir() +
"/2023092710_1.txt",
+ taskProfile.getCycleUnit(), "2023092710",
AgentUtils.getCurrentTime());
action = new InstanceAction();
action.setActionType(ActionType.ADD);
action.setProfile(profile);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
index b8703c7056..56f46d74a0 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -50,8 +50,8 @@ public class KafkaSinkTest {
TaskProfile taskProfile =
helper.getFileTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
- profile = taskProfile.createInstanceProfile("", fileName,
- taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
+ profile = taskProfile.createInstanceProfile(fileName,
taskProfile.getCycleUnit(), "20230927",
+ AgentUtils.getCurrentTime());
kafkaSink = new MockSink();
kafkaSink.init(profile);
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
index 43e3115dcb..c8cf365850 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -50,8 +50,8 @@ public class PulsarSinkTest {
TaskProfile taskProfile =
helper.getFileTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
- profile = taskProfile.createInstanceProfile("", fileName,
- taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
+ profile = taskProfile.createInstanceProfile(fileName,
taskProfile.getCycleUnit(), "20230927",
+ AgentUtils.getCurrentTime());
pulsarSink = new MockSink();
pulsarSink.init(profile);
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 1c9e623b9b..9655e757ef 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -24,7 +24,7 @@ import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.message.file.OffsetAckInfo;
import org.apache.inlong.agent.message.file.SenderMessage;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
@@ -73,8 +73,8 @@ public class TestSenderManager {
TaskProfile taskProfile =
helper.getFileTaskProfile(1, pattern, "csv", false, "", "",
TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
- profile = taskProfile.createInstanceProfile("", fileName,
- taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
+ profile = taskProfile.createInstanceProfile(fileName,
taskProfile.getCycleUnit(), "20230927",
+ AgentUtils.getCurrentTime());
}
@AfterClass
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index d7a93a0df8..2bbe41859a 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -27,7 +27,7 @@ import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
import org.apache.inlong.agent.store.Store;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
@@ -87,8 +87,8 @@ public class TestLogFileSource {
TaskProfile taskProfile = helper.getFileTaskProfile(taskId,
pattern, dataContentStyle, retry, "", "",
TaskStateEnum.RUNNING, "D",
"GMT+8:00", Arrays.asList("ok"));
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile("",
- fileName, taskProfile.getCycleUnit(), "20230928",
AgentUtils.getCurrentTime());
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(fileName, taskProfile.getCycleUnit(),
+ "20230928", AgentUtils.getCurrentTime());
instanceProfile.set(TaskConstants.INODE_INFO,
FileDataUtils.getInodeInfo(instanceProfile.getInstanceId()));
LogFileSource source = new LogFileSource();
Whitebox.setInternalState(source, "BATCH_READ_LINE_COUNT", 1);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
index 14518078f2..8fb551a083 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
@@ -123,8 +123,8 @@ public class TestRedisSource {
TaskProfile taskProfile = helper.getFileTaskProfile(1, "", "csv",
false, "", "", TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
- profile = taskProfile.createInstanceProfile("",
- "", taskProfile.getCycleUnit(), "20240725",
AgentUtils.getCurrentTime());
+ profile = taskProfile.createInstanceProfile("",
taskProfile.getCycleUnit(), "20240725",
+ AgentUtils.getCurrentTime());
profile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
profile.set(CommonConstants.PROXY_INLONG_STREAM_ID, streamId);
profile.set(TaskConstants.TASK_REDIS_AUTHUSER, username);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 410acd8e77..960f2a5144 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -138,8 +138,8 @@ public class TestSQLServerSource {
TaskProfile taskProfile = helper.getFileTaskProfile(1, "", "csv",
false, "", "", TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
- instanceProfile = taskProfile.createInstanceProfile("",
- "", taskProfile.getCycleUnit(), "20240725",
AgentUtils.getCurrentTime());
+ instanceProfile = taskProfile.createInstanceProfile("",
taskProfile.getCycleUnit(), "20240725",
+ AgentUtils.getCurrentTime());
instanceProfile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
instanceProfile.set(CommonConstants.PROXY_INLONG_STREAM_ID, streamId);
instanceProfile.set(TaskConstants.TASK_SQLSERVER_USER, username);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
index f66e4845b1..4d3bc0508d 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
@@ -22,7 +22,7 @@ import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.task.cos.COSTask;
+import org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask;
import org.apache.inlong.agent.plugin.utils.cos.COSUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
@@ -204,20 +204,20 @@ public class TestCOSTask {
TaskProfile taskProfile = helper.getCOSTaskProfile(taskId, pattern,
"csv", true, startTime, endTime,
TaskStateEnum.RUNNING,
cycle, "GMT+8:00", null);
- COSTask task = null;
+ COSTask fileTask = null;
final List<String> fileName = new ArrayList();
final List<String> dataTime = new ArrayList();
try {
- task = PowerMockito.spy(new COSTask());
+ fileTask = PowerMockito.spy(new COSTask());
PowerMockito.doAnswer(invocation -> {
fileName.add(invocation.getArgument(0));
dataTime.add(invocation.getArgument(1));
return null;
- }).when(task, "addToEvenMap", Mockito.anyString(),
Mockito.anyString());
- Assert.assertTrue(task.isProfileValid(taskProfile));
+ }).when(fileTask, "addToEvenMap", Mockito.anyString(),
Mockito.anyString());
+ Assert.assertTrue(fileTask.isProfileValid(taskProfile));
manager.getTaskStore().storeTask(taskProfile);
- task.init(manager, taskProfile, manager.getInstanceBasicStore());
- EXECUTOR_SERVICE.submit(task);
+ fileTask.init(manager, taskProfile,
manager.getInstanceBasicStore());
+ EXECUTOR_SERVICE.submit(fileTask);
} catch (Exception e) {
LOGGER.error("source init error", e);
Assert.assertTrue("source init error", false);
@@ -228,6 +228,6 @@ public class TestCOSTask {
Assert.assertEquals(0, fileName.get(i).compareTo(srcKeys.get(i)));
Assert.assertEquals(0,
dataTime.get(i).compareTo(srcDataTimes.get(i)));
}
- task.destroy();
+ fileTask.destroy();
}
}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index 7d1962c314..d2c8e8a342 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -22,7 +22,7 @@ import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.task.file.LogFileTask;
+import org.apache.inlong.agent.plugin.task.logcollection.local.FileTask;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.junit.AfterClass;
@@ -49,7 +49,7 @@ import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
@RunWith(PowerMockRunner.class)
-@PrepareForTest(LogFileTask.class)
+@PrepareForTest(FileTask.class)
@PowerMockIgnore({"javax.management.*"})
public class TestLogFileTask {
@@ -103,11 +103,11 @@ public class TestLogFileTask {
TaskProfile taskProfile =
helper.getFileTaskProfile(taskId, pattern, "csv", true,
startTime, endTime, TaskStateEnum.RUNNING,
cycle, "GMT+8:00", null);
- LogFileTask dayTask = null;
+ FileTask dayTask = null;
final List<String> fileName = new ArrayList();
final List<String> dataTime = new ArrayList();
try {
- dayTask = PowerMockito.spy(new LogFileTask());
+ dayTask = PowerMockito.spy(new FileTask());
PowerMockito.doAnswer(invocation -> {
fileName.add(invocation.getArgument(0));
dataTime.add(invocation.getArgument(1));
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
index 4cab9fa8f0..c6ba4a449b 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.store.TaskStore;
import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -34,6 +35,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;
import static org.awaitility.Awaitility.await;
public class TestTaskManager {
@@ -61,7 +63,7 @@ public class TestTaskManager {
TaskProfile taskProfile =
helper.getFileTaskProfile(i, pattern, "csv", false,
"", "", TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
- taskProfile.setTaskClass(MockTask.class.getCanonicalName());
+ taskProfile.setInt(TASK_TYPE, TaskTypeEnum.MOCK.getType());
taskStore.storeTask(taskProfile);
}
manager.start();
@@ -78,7 +80,7 @@ public class TestTaskManager {
TaskProfile taskProfile1 = helper.getFileTaskProfile(100, pattern,
"csv", false, "", "", TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
String taskId1 = taskProfile1.getTaskId();
- taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
+ taskProfile1.setInt(TASK_TYPE, TaskTypeEnum.MOCK.getType());
List<TaskProfile> taskProfiles1 = new ArrayList<>();
taskProfiles1.add(taskProfile1);
// test add
@@ -102,7 +104,7 @@ public class TestTaskManager {
// test delete
TaskProfile taskProfile2 = helper.getFileTaskProfile(200, pattern,
"csv", false, "", "", TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
- taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
+ taskProfile2.setInt(TASK_TYPE, TaskTypeEnum.MOCK.getType());
List<TaskProfile> taskProfiles2 = new ArrayList<>();
taskProfiles2.add(taskProfile2);
manager.submitTaskProfiles(taskProfiles2);