This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1f8a7fffb3 [INLONG-11569][Agent] Add COS Task (#11570)
1f8a7fffb3 is described below
commit 1f8a7fffb35bbfb53d1ada9ee4a1ea7005e1d1cd
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Dec 2 17:00:23 2024 +0800
[INLONG-11569][Agent] Add COS Task (#11570)
* [INLONG-11569][Agent] Add COS Task
* [INLONG-11569][Agent] Modify code based on comments
---
.../apache/inlong/agent/conf/InstanceProfile.java | 4 +-
.../org/apache/inlong/agent/conf/TaskProfile.java | 8 +--
.../inlong/agent/constant/TaskConstants.java | 24 +++++--
.../java/org/apache/inlong/agent/pojo/COSTask.java | 73 ++++++++++++++++++++++
.../apache/inlong/agent/pojo/TaskProfileDto.java | 54 +++++++++++++++-
.../inlong/agent/plugin/sources/LogFileSource.java | 9 ++-
.../inlong/agent/plugin/task/file/LogFileTask.java | 18 +++---
.../apache/inlong/common/enums/TaskTypeEnum.java | 6 +-
8 files changed, 165 insertions(+), 31 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index c9a3d6a022..9e85872ff5 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -36,10 +36,10 @@ import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
/**
* job profile which contains details describing properties of one job.
@@ -200,6 +200,6 @@ public class InstanceProfile extends AbstractConfiguration
implements Comparable
}
public boolean isRetry() {
- return getBoolean(FILE_TASK_RETRY, false);
+ return getBoolean(TASK_RETRY, false);
}
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index 32450735e4..8c509e4240 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -36,7 +36,7 @@ import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
/**
@@ -65,10 +65,6 @@ public class TaskProfile extends AbstractConfiguration {
return get(TaskConstants.TASK_CYCLE_UNIT);
}
- public String getTimeOffset() {
- return get(TaskConstants.TASK_FILE_TIME_OFFSET, "");
- }
-
public String getTimeZone() {
return get(TaskConstants.TASK_TIME_ZONE);
}
@@ -82,7 +78,7 @@ public class TaskProfile extends AbstractConfiguration {
}
public boolean isRetry() {
- return getBoolean(FILE_TASK_RETRY, false);
+ return getBoolean(TASK_RETRY, false);
}
public String getTaskClass() {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 22fb87e6e5..a4f9156577 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -59,10 +59,10 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_CYCLE_UNIT = "task.cycleUnit";
public static final String FILE_TASK_CYCLE_UNIT =
"task.fileTask.cycleUnit";
public static final String TASK_FILE_CONTENT_COLLECT_TYPE =
"task.fileTask.contentCollectType";
- public static final String SOURCE_DATA_CONTENT_STYLE =
"task.fileTask.dataContentStyle";
- public static final String SOURCE_DATA_SEPARATOR =
"task.fileTask.dataSeparator";
- public static final String SOURCE_FILTER_STREAMS =
"task.fileTask.filterStreams";
- public static final String FILE_TASK_RETRY = "task.fileTask.retry";
+ public static final String FILE_CONTENT_STYLE =
"task.fileTask.dataContentStyle";
+ public static final String FILE_DATA_SEPARATOR =
"task.fileTask.dataSeparator";
+ public static final String FILE_FILTER_STREAMS =
"task.fileTask.filterStreams";
+ public static final String TASK_RETRY = "task.retry";
public static final String FILE_TASK_TIME_FROM =
"task.fileTask.dataTimeFrom";
public static final String FILE_TASK_TIME_TO = "task.fileTask.dataTimeTo";
public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
@@ -75,6 +75,22 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_KAFKA_OFFSET =
"task.kafkaTask.partition.offset";
public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET =
"task.kafkaTask.autoOffsetReset";
+ // COS task
+ public static final String COS_TASK_CYCLE_UNIT = "task.cosTask.cycleUnit";
+ public static final String COS_CONTENT_STYLE = "task.cosTask.contentStyle";
+ public static final String COS_MAX_NUM = "task.cosTask.maxFileCount";
+ public static final String COS_TASK_PATTERN = "task.cosTask.pattern";
+ public static final String TASK_COS_TIME_OFFSET =
"task.cosTask.timeOffset";
+ public static final String COS_TASK_RETRY = "task.cosTask.retry";
+ public static final String COS_TASK_TIME_FROM =
"task.cosTask.dataTimeFrom";
+ public static final String COS_TASK_TIME_TO = "task.cosTask.dataTimeTo";
+ public static final String COS_TASK_BUCKET_NAME =
"task.cosTask.bucketName";
+ public static final String COS_TASK_SECRET_ID = "task.cosTask.secretId";
+ public static final String COS_TASK_SECRET_KEY = "task.cosTask.secretKey";
+ public static final String COS_TASK_REGION = "task.cosTask.region";
+ public static final String COS_DATA_SEPARATOR =
"task.cosTask.dataSeparator";
+ public static final String COS_FILTER_STREAMS =
"task.cosTask.filterStreams";
+
/**
* delimiter to split offset for different task
*/
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/COSTask.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/COSTask.java
new file mode 100644
index 0000000000..83d43c6af3
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/COSTask.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class COSTask {
+
+ private Integer id;
+ private String pattern;
+ private String cycleUnit;
+ private Boolean retry;
+ private String dataTimeFrom;
+ private String dataTimeTo;
+ private String timeOffset;
+ private Integer maxFileCount;
+ private String collectType;
+ private String contentStyle;
+ private String dataSeparator;
+ private String filterStreams;
+ private String bucketName;
+ private String secretId;
+ private String secretKey;
+ private String region;
+
+ @Data
+ public static class COSTaskConfig {
+
+ private String pattern;
+ private String cycleUnit;
+ private Boolean retry;
+ private String dataTimeFrom;
+ private String dataTimeTo;
+ // '1m' means one minute after, '-1m' means one minute before
+ // '1h' means one hour after, '-1h' means one hour before
+ // '1d' means one day after, '-1d' means one day before
+ // Null means from current timestamp
+ private String timeOffset;
+ private Integer maxFileCount;
+ // Collect type, for example: FULL, INCREMENT
+ private String collectType;
+ // Type of data result for column separator
+ // CSV format, set this parameter to a custom separator: , | :
+ // Json format, set this parameter to json
+ private String contentStyle;
+ // Column separator of data source
+ private String dataSeparator;
+ // The streamIds to be filtered out
+ private List<String> filterStreams;
+ private String bucketName;
+ private String credentialsId;
+ private String credentialsKey;
+ private String region;
+ }
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 2d4a6a32ae..a9134de3a8 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.pojo.BinlogTask.BinlogTaskConfig;
+import org.apache.inlong.agent.pojo.COSTask.COSTaskConfig;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.agent.pojo.FileTask.Line;
import org.apache.inlong.agent.pojo.KafkaTask.KafkaTaskConfig;
@@ -37,6 +38,8 @@ import org.apache.inlong.common.pojo.agent.DataConfig;
import com.google.gson.Gson;
import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.stream.Collectors;
@@ -48,6 +51,8 @@ import static
org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_D
@Data
public class TaskProfileDto {
+ private static final Logger logger =
LoggerFactory.getLogger(TaskProfileDto.class);
+
public static final String DEFAULT_FILE_TASK =
"org.apache.inlong.agent.plugin.task.file.LogFileTask";
public static final String DEFAULT_KAFKA_TASK =
"org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_PULSAR_TASK =
"org.apache.inlong.agent.plugin.task.PulsarTask";
@@ -62,7 +67,7 @@ public class TaskProfileDto {
public static final String DEFAULT_DATA_PROXY_SINK =
"org.apache.inlong.agent.plugin.sinks.ProxySink";
public static final String PULSAR_SINK =
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
public static final String KAFKA_SINK =
"org.apache.inlong.agent.plugin.sinks.KafkaSink";
-
+ public static final String DEFAULT_COS_TASK =
"org.apache.inlong.agent.plugin.task.cos.COSTask";
/**
* file source
*/
@@ -101,6 +106,10 @@ public class TaskProfileDto {
* sqlserver source
*/
public static final String SQLSERVER_SOURCE =
"org.apache.inlong.agent.plugin.sources.SQLServerSource";
+ /**
+ * cos source
+ */
+ public static final String COS_SOURCE =
"org.apache.inlong.agent.plugin.sources.COSSource";
private static final Gson GSON = new Gson();
@@ -197,6 +206,35 @@ public class TaskProfileDto {
return fileTask;
}
+ private static COSTask getCOSTask(DataConfig dataConfig) {
+ COSTask cosTask = new COSTask();
+ cosTask.setId(dataConfig.getTaskId());
+ COSTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(),
+ COSTaskConfig.class);
+ cosTask.setPattern(taskConfig.getPattern());
+ cosTask.setCollectType(taskConfig.getCollectType());
+ cosTask.setContentStyle(taskConfig.getContentStyle());
+ cosTask.setDataSeparator(taskConfig.getDataSeparator());
+ cosTask.setMaxFileCount(taskConfig.getMaxFileCount());
+ cosTask.setRetry(taskConfig.getRetry());
+ cosTask.setCycleUnit(taskConfig.getCycleUnit());
+ cosTask.setDataTimeFrom(taskConfig.getDataTimeFrom());
+ cosTask.setDataTimeTo(taskConfig.getDataTimeTo());
+ cosTask.setBucketName(taskConfig.getBucketName());
+ cosTask.setSecretId(taskConfig.getCredentialsId());
+ cosTask.setSecretKey(taskConfig.getCredentialsKey());
+ cosTask.setRegion(taskConfig.getRegion());
+ if (taskConfig.getFilterStreams() != null) {
+
cosTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams()));
+ }
+ if (taskConfig.getTimeOffset() != null) {
+ cosTask.setTimeOffset(taskConfig.getTimeOffset());
+ } else {
+ cosTask.setTimeOffset(deafult_time_offset +
cosTask.getCycleUnit());
+ }
+ return cosTask;
+ }
+
private static KafkaTask getKafkaTask(DataConfig dataConfigs) {
KafkaTaskConfig kafkaTaskConfig =
GSON.fromJson(dataConfigs.getExtParams(),
@@ -468,6 +506,7 @@ public class TaskProfileDto {
throw new IllegalArgumentException("invalid mq type " + mqType
+ " please check");
}
}
+ task.setRetry(false);
TaskTypeEnum taskType =
TaskTypeEnum.getTaskType(dataConfig.getTaskType());
switch (requireNonNull(taskType)) {
case SQL:
@@ -483,6 +522,7 @@ public class TaskProfileDto {
task.setCycleUnit(fileTask.getCycleUnit());
task.setFileTask(fileTask);
task.setSource(DEFAULT_SOURCE);
+ task.setRetry(fileTask.getRetry());
profileDto.setTask(task);
break;
case KAFKA:
@@ -544,7 +584,17 @@ public class TaskProfileDto {
case MOCK:
profileDto.setTask(task);
break;
+ case COS:
+ task.setTaskClass(DEFAULT_COS_TASK);
+ COSTask cosTask = getCOSTask(dataConfig);
+ task.setCycleUnit(cosTask.getCycleUnit());
+ task.setCosTask(cosTask);
+ task.setSource(COS_SOURCE);
+ task.setRetry(cosTask.getRetry());
+ profileDto.setTask(task);
+ break;
default:
+ logger.error("invalid task type {}", taskType);
}
return TaskProfile.parseJsonStr(GSON.toJson(profileDto));
}
@@ -574,6 +624,7 @@ public class TaskProfileDto {
private String cycleUnit;
private String timeZone;
private String auditVersion;
+ private boolean retry;
private FileTask fileTask;
private BinlogTask binlogTask;
@@ -585,6 +636,7 @@ public class TaskProfileDto {
private RedisTask redisTask;
private MqttTask mqttTask;
private SqlServerTask sqlserverTask;
+ private COSTask cosTask;
}
@Data
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index bd53b4530f..300c16168f 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -48,7 +48,7 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
-import static
org.apache.inlong.agent.constant.TaskConstants.SOURCE_DATA_CONTENT_STYLE;
+import static
org.apache.inlong.agent.constant.TaskConstants.FILE_CONTENT_STYLE;
/**
* Read text files
@@ -355,7 +355,7 @@ public class LogFileSource extends AbstractSource {
FileStatic data = new FileStatic();
data.setTaskId(taskId);
data.setRetry(String.valueOf(profile.isRetry()));
- data.setContentType(profile.get(SOURCE_DATA_CONTENT_STYLE));
+ data.setContentType(profile.get(FILE_CONTENT_STYLE));
data.setGroupId(profile.getInlongGroupId());
data.setStreamId(profile.getInlongStreamId());
data.setDataTime(format.format(profile.getSinkDataTime()));
@@ -364,10 +364,9 @@ public class LogFileSource extends AbstractSource {
data.setReadBytes(String.valueOf(bytePosition));
data.setReadLines(String.valueOf(linePosition));
OffsetProfile offsetProfile =
OffsetManager.getInstance().getOffset(taskId, instanceId);
- if (offsetProfile == null) {
- return;
+ if (offsetProfile != null) {
+ data.setSendLines(offsetProfile.getOffset());
}
- data.setSendLines(offsetProfile.getOffset());
FileStaticManager.putStaticMsg(data);
randomAccessFile.close();
} catch (IOException e) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index 0c104956d7..4eb3b69525 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -87,6 +87,7 @@ public class LogFileTask extends AbstractTask {
public final long SCAN_INTERVAL = 1 * 60 * 1000;
private volatile boolean runAtLeastOneTime = false;
private volatile long coreThreadUpdateTime = 0;
+ private String timeOffset = "";
private BlockingQueue<InstanceProfile> instanceQueue;
@Override
@@ -96,8 +97,9 @@ public class LogFileTask extends AbstractTask {
@Override
protected void initTask() {
+ timeOffset = taskProfile.get(TaskConstants.TASK_FILE_TIME_OFFSET, "");
instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
- retry = taskProfile.getBoolean(TaskConstants.FILE_TASK_RETRY, false);
+ retry = taskProfile.isRetry();
originPatterns =
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
.collect(Collectors.toSet());
if
(taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
@@ -134,7 +136,6 @@ public class LogFileTask extends AbstractTask {
while (list.size() < INSTANCE_QUEUE_CAPACITY &&
!instanceQueue.isEmpty()) {
InstanceProfile profile = instanceQueue.poll();
if (profile != null) {
- LOGGER.info("test123 2 taskid {} {}", getTaskId(),
profile.getInstanceId());
list.add(profile);
}
}
@@ -175,7 +176,7 @@ public class LogFileTask extends AbstractTask {
LOGGER.error("task profile needs time offset");
return false;
}
- if (profile.getBoolean(TaskConstants.FILE_TASK_RETRY, false)) {
+ if (profile.isRetry()) {
if (!initRetryTask(profile)) {
return false;
}
@@ -292,11 +293,11 @@ public class LogFileTask extends AbstractTask {
private List<BasicFileInfo> scanExistingFileByPattern(String
originPattern) {
if (realTime) {
- return FileScanner.scanTaskBetweenTimes(originPattern,
CycleUnitType.HOUR, taskProfile.getTimeOffset(),
+ return FileScanner.scanTaskBetweenTimes(originPattern,
CycleUnitType.HOUR, timeOffset,
startTime, endTime, retry);
} else {
return FileScanner.scanTaskBetweenTimes(originPattern,
taskProfile.getCycleUnit(),
- taskProfile.getTimeOffset(), startTime, endTime, retry);
+ timeOffset, startTime, endTime, retry);
}
}
@@ -328,7 +329,7 @@ public class LogFileTask extends AbstractTask {
long startScanTime = startTime;
long endScanTime = endTime;
List<String> dataTimeList = Scanner.getDataTimeList(startScanTime,
endScanTime, taskProfile.getCycleUnit(),
- taskProfile.getTimeOffset(), retry);
+ timeOffset, retry);
if (dataTimeList.isEmpty()) {
LOGGER.error("getDataTimeList get empty list");
return;
@@ -390,7 +391,7 @@ public class LogFileTask extends AbstractTask {
*/
private boolean shouldStartNow(String dataTime) {
String shouldStartTime =
- NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), taskProfile.getTimeOffset());
+ NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), timeOffset);
String currentTime = getCurrentTime();
return currentTime.compareTo(shouldStartTime) >= 0;
}
@@ -531,8 +532,7 @@ public class LogFileTask extends AbstractTask {
if (dateExpression.getLongestDatePattern().length() != 0) {
String dataTime = getDataTimeFromFileName(newFileName,
entity.getOriginPattern(), dateExpression);
LOGGER.info("file {}, fileTime {}", newFileName, dataTime);
- if (!NewDateUtils.isValidCreationTime(dataTime,
entity.getCycleUnit(),
- taskProfile.getTimeOffset())) {
+ if (!NewDateUtils.isValidCreationTime(dataTime,
entity.getCycleUnit(), timeOffset)) {
return false;
}
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index c84ea142db..3a032e6511 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -44,11 +44,9 @@ public enum TaskTypeEnum {
REDIS(11),
MQTT(12),
HUDI(13),
-
+ COS(14),
// only used for unit test
- MOCK(201)
-
- ;
+ MOCK(201);
private static final Map<Integer, TaskTypeEnum> TASK_TYPE_ENUM_MAP =
Maps.newHashMap();