This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2f0a575dc3 [INLONG-11135][Agent] Support filtering capability when
supplementing data (#11137)
2f0a575dc3 is described below
commit 2f0a575dc3147ab287797ea92a253c2ead788933
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Sep 19 16:06:05 2024 +0800
[INLONG-11135][Agent] Support filtering capability when supplementing data
(#11137)
* [INLONG-11135][Agent] Support filtering capability when supplementing data
* INLONG-11135][Agent] Support filtering capability when supplementing data
* [INLONG-11135][Agent] Support filtering capability when supplementing data
---
.../inlong/agent/conf/AbstractConfiguration.java | 6 +--
.../inlong/agent/constant/TaskConstants.java | 3 +-
.../org/apache/inlong/agent/pojo/FileTask.java | 10 ++---
.../apache/inlong/agent/pojo/TaskProfileDto.java | 3 ++
.../agent/plugin/sources/file/AbstractSource.java | 43 ++++++++++++++++------
...dedHandler.java => DefaultExtendedHandler.java} | 11 ++++--
.../sources/file/extend/ExtendedHandler.java | 9 +++--
.../inlong/agent/plugin/AgentBaseTestsHelper.java | 18 ++++++---
.../agent/plugin/instance/TestInstanceManager.java | 4 +-
.../inlong/agent/plugin/sinks/KafkaSinkTest.java | 4 +-
.../inlong/agent/plugin/sinks/PulsarSinkTest.java | 4 +-
.../sinks/filecollect/TestSenderManager.java | 4 +-
.../agent/plugin/sources/TestLogFileSource.java | 5 ++-
.../agent/plugin/sources/TestSQLServerSource.java | 8 ++--
.../inlong/agent/plugin/task/TestLogFileTask.java | 5 ++-
.../inlong/agent/plugin/task/TestTaskManager.java | 12 +++---
.../src/test/resources/test/mix_20230928_1.txt | 3 ++
17 files changed, 97 insertions(+), 55 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
index f323386bfd..e8842d7d3a 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
@@ -45,8 +45,6 @@ import java.util.Properties;
public abstract class AbstractConfiguration {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractConfiguration.class);
- private static final JsonParser JSON_PARSER = new JsonParser();
-
private final Map<String, JsonPrimitive> configStorage = new HashMap<>();
/**
@@ -81,7 +79,7 @@ public abstract class AbstractConfiguration {
if (inputStream != null) {
reader = new InputStreamReader(inputStream,
StandardCharsets.UTF_8);
if (isJson) {
- JsonElement tmpElement =
JSON_PARSER.parse(reader).getAsJsonObject();
+ JsonElement tmpElement =
JsonParser.parseReader(reader).getAsJsonObject();
updateConfig(new HashMap<>(10), 0, tmpElement);
} else {
Properties properties = new Properties();
@@ -103,7 +101,7 @@ public abstract class AbstractConfiguration {
* @param jsonStr json string
*/
public void loadJsonStrResource(String jsonStr) {
- JsonElement tmpElement = JSON_PARSER.parse(jsonStr);
+ JsonElement tmpElement = JsonParser.parseString(jsonStr);
updateConfig(new HashMap<>(10), 0, tmpElement);
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index d2642efb03..9398d70640 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -61,6 +61,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_FILE_CONTENT_COLLECT_TYPE =
"task.fileTask.contentCollectType";
public static final String SOURCE_DATA_CONTENT_STYLE =
"task.fileTask.dataContentStyle";
public static final String SOURCE_DATA_SEPARATOR =
"task.fileTask.dataSeparator";
+ public static final String SOURCE_FILTER_STREAMS =
"task.fileTask.filterStreams";
public static final String TASK_RETRY = "task.fileTask.retry";
public static final String TASK_START_TIME = "task.fileTask.startTime";
public static final String TASK_END_TIME = "task.fileTask.endTime";
@@ -68,7 +69,7 @@ public class TaskConstants extends CommonConstants {
public static final String PREDEFINE_FIELDS = "task.predefinedFields";
public static final String FILE_SOURCE_EXTEND_CLASS =
"task.fileTask.extendedClass";
public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS =
-
"org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler";
+
"org.apache.inlong.agent.plugin.sources.file.extend.DefaultExtendedHandler";
public static final String TASK_AUDIT_VERSION = "task.auditVersion";
// Kafka task
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
index 6397ecf5db..57c294f7d4 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
@@ -19,7 +19,7 @@ package org.apache.inlong.agent.pojo;
import lombok.Data;
-import java.util.Map;
+import java.util.List;
@Data
public class FileTask {
@@ -46,8 +46,8 @@ public class FileTask {
private String dataSeparator;
- // JSON string, the content format is Map<String,Object>
- private String properties;
+ // The streamIds to be filtered out
+ private String filterStreams;
// Monitor interval for file
private Long monitorInterval;
@@ -121,8 +121,8 @@ public class FileTask {
// Column separator of data source
private String dataSeparator;
- // Properties for file
- private Map<String, Object> properties;
+ // The streamIds to be filtered out
+ private List<String> filterStreams;
// Monitor interval for file
private Long monitorInterval;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index e340f88d8b..bb602e2c61 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -167,6 +167,9 @@ public class TaskProfileDto {
fileTask.setCycleUnit(taskConfig.getCycleUnit());
fileTask.setStartTime(taskConfig.getStartTime());
fileTask.setEndTime(taskConfig.getEndTime());
+ if (taskConfig.getFilterStreams() != null) {
+
fileTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams()));
+ }
if (taskConfig.getTimeOffset() != null) {
fileTask.setTimeOffset(taskConfig.getTimeOffset());
} else {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 6ee2950d3c..f1fb8b5570 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -309,6 +309,37 @@ public abstract class AbstractSource implements Source {
@Override
public Message read() {
+ SourceData sourceData = readFromQueue();
+ while (sourceData != null) {
+ Message msg = createMessage(sourceData);
+ if (filterSourceData(msg)) {
+ long auditTime = 0;
+ if (isRealTime) {
+ auditTime = AgentUtils.getCurrentTime();
+ } else {
+ auditTime = profile.getSinkDataTime();
+ }
+ Map<String, String> header = msg.getHeader();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
+ auditTime, 1, sourceData.getData().length,
auditVersion);
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId,
+ header.get(PROXY_KEY_STREAM_ID),
+ AgentUtils.getCurrentTime(), 1,
sourceData.getData().length, auditVersion);
+ return msg;
+ }
+ sourceData = readFromQueue();
+ }
+ return null;
+ }
+
+ private boolean filterSourceData(Message msg) {
+ if (extendedHandler != null) {
+ return extendedHandler.filterMessage(msg);
+ }
+ return true;
+ }
+
+ private SourceData readFromQueue() {
SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
@@ -321,7 +352,7 @@ public abstract class AbstractSource implements Source {
}
LOGGER.debug("Read from source queue {} {}", new
String(sourceData.getData()), inlongGroupId);
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.getData().length);
- return createMessage(sourceData);
+ return sourceData;
}
private Message createMessage(SourceData sourceData) {
@@ -333,16 +364,6 @@ public abstract class AbstractSource implements Source {
if (extendedHandler != null) {
extendedHandler.dealWithHeader(header, sourceData.getData());
}
- long auditTime = 0;
- if (isRealTime) {
- auditTime = AgentUtils.getCurrentTime();
- } else {
- auditTime = profile.getSinkDataTime();
- }
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
- auditTime, 1, sourceData.getData().length, auditVersion);
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME,
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
- AgentUtils.getCurrentTime(), 1, sourceData.getData().length,
auditVersion);
Message finalMsg = new DefaultMessage(sourceData.getData(), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/DefaultExtendedHandler.java
similarity index 80%
copy from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
copy to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/DefaultExtendedHandler.java
index 8cd2e76f48..18df71bf31 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/DefaultExtendedHandler.java
@@ -18,19 +18,22 @@
package org.apache.inlong.agent.plugin.sources.file.extend;
import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.plugin.Message;
import java.util.Map;
-// For some private, customized extension processing
-public abstract class ExtendedHandler {
-
- public ExtendedHandler(InstanceProfile profile) {
+public class DefaultExtendedHandler extends ExtendedHandler {
+ public DefaultExtendedHandler(InstanceProfile profile) {
+ super(profile);
}
// Modify the header by the body
public void dealWithHeader(Map<String, String> header, byte[] body) {
+ }
+ public boolean filterMessage(Message msg) {
+ return true;
}
public static class Constants {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
index 8cd2e76f48..2412a3055c 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
@@ -18,20 +18,23 @@
package org.apache.inlong.agent.plugin.sources.file.extend;
import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.plugin.Message;
import java.util.Map;
// For some private, customized extension processing
public abstract class ExtendedHandler {
- public ExtendedHandler(InstanceProfile profile) {
+ protected InstanceProfile profile;
+ public ExtendedHandler(InstanceProfile profile) {
+ this.profile = profile;
}
// Modify the header by the body
- public void dealWithHeader(Map<String, String> header, byte[] body) {
+ abstract public void dealWithHeader(Map<String, String> header, byte[]
body);
- }
+ abstract public boolean filterMessage(Message msg);
public static class Constants {
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 10e4532be2..a7693f7da4 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.List;
/**
* common environment setting up for test cases.
@@ -80,15 +81,19 @@ public class AgentBaseTestsHelper {
}
}
- public TaskProfile getTaskProfile(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
- TaskStateEnum state, String cycleUnit, String timeZone) {
- DataConfig dataConfig = getDataConfig(taskId, pattern, retry,
startTime, endTime, state, cycleUnit, timeZone);
+ public TaskProfile getTaskProfile(int taskId, String pattern, String
dataContentStyle, boolean retry,
+ Long startTime, Long endTime,
+ TaskStateEnum state, String cycleUnit, String timeZone,
List<String> filterStreams) {
+ DataConfig dataConfig = getDataConfig(taskId, pattern,
dataContentStyle, retry, startTime, endTime,
+ state, cycleUnit, timeZone,
+ filterStreams);
TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
return profile;
}
- private DataConfig getDataConfig(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
- TaskStateEnum state, String cycleUnit, String timeZone) {
+ private DataConfig getDataConfig(int taskId, String pattern, String
dataContentStyle, boolean retry, Long startTime,
+ Long endTime,
+ TaskStateEnum state, String cycleUnit, String timeZone,
List<String> filterStreams) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
dataConfig.setInlongStreamId("testStreamId");
@@ -107,8 +112,9 @@ public class AgentBaseTestsHelper {
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
// mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf
- fileTaskConfig.setDataContentStyle("mix");
+ fileTaskConfig.setDataContentStyle(dataContentStyle);
fileTaskConfig.setDataSeparator("|");
+ fileTaskConfig.setFilterStreams(filterStreams);
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
return dataConfig;
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
index ce9a77acf6..f2c5f25ee3 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
@@ -59,8 +59,8 @@ public class TestInstanceManager {
helper = new
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt";
Store basicInstanceStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
- taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L,
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
- "GMT+6:00");
+ taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L,
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
+ "GMT+6:00", null);
Store taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
TaskStore taskStore = new TaskStore(taskBasicStore);
taskStore.storeTask(taskProfile);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
index 26e06c1326..5524c5e96e 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -47,8 +47,8 @@ public class KafkaSinkTest {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING, "D",
- "GMT+8:00");
+ TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+ "GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
kafkaSink = new MockSink();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
index 39f6ec8e71..93702fad16 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -47,8 +47,8 @@ public class PulsarSinkTest {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING, "D",
- "GMT+8:00");
+ TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+ "GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
pulsarSink = new MockSink();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 91b3c6c10a..afeb3565e2 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -70,8 +70,8 @@ public class TestSenderManager {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING, "D",
- "GMT+8:00");
+ TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv",
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+ "GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 1049bebb1a..df70039459 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -77,8 +77,9 @@ public class TestLogFileSource {
private LogFileSource getSource(int taskId, long offset) {
try {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
- "GMT+8:00");
+ TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
"csv", false, 0L, 0L,
+ TaskStateEnum.RUNNING, "D",
+ "GMT+8:00", null);
String fileName =
LOADER.getResource("test/20230928_1.txt").getPath();
InstanceProfile instanceProfile =
taskProfile.createInstanceProfile("",
fileName, taskProfile.getCycleUnit(), "20230928",
AgentUtils.getCurrentTime());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 0dc8b71be4..2a90bdc37a 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -49,7 +49,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@@ -134,8 +136,8 @@ public class TestSQLServerSource {
final String tableName = "test_source";
final String serverName = "server-01";
- TaskProfile taskProfile = helper.getTaskProfile(1, "", false, 0L, 0L,
TaskStateEnum.RUNNING, "D",
- "GMT+8:00");
+ TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false,
0L, 0L, TaskStateEnum.RUNNING, "D",
+ "GMT+8:00", null);
instanceProfile = taskProfile.createInstanceProfile("",
"", taskProfile.getCycleUnit(), "20240725",
AgentUtils.getCurrentTime());
instanceProfile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index 30abede6fc..1ef3b5db1e 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -103,8 +103,9 @@ public class TestLogFileTask {
for (int i = 0; i < resources.size(); i++) {
resourceName.add(LOADER.getResource(resources.get(i)).getPath());
}
- TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, true,
0L, 0L, TaskStateEnum.RUNNING, cycle,
- "GMT+8:00");
+ TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
"csv", true, 0L, 0L, TaskStateEnum.RUNNING,
+ cycle,
+ "GMT+8:00", null);
LogFileTask dayTask = null;
final List<String> fileName = new ArrayList();
final List<String> dataTime = new ArrayList();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
index c5370fc69d..608d3adec6 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
@@ -58,8 +58,8 @@ public class TestTaskManager {
manager = new TaskManager();
TaskStore taskStore = manager.getTaskStore();
for (int i = 1; i <= 10; i++) {
- TaskProfile taskProfile = helper.getTaskProfile(i, pattern,
false, 0L, 0L, TaskStateEnum.RUNNING,
- "D", "GMT+8:00");
+ TaskProfile taskProfile = helper.getTaskProfile(i, pattern,
"csv", false, 0L, 0L, TaskStateEnum.RUNNING,
+ "D", "GMT+8:00", null);
taskProfile.setTaskClass(MockTask.class.getCanonicalName());
taskStore.storeTask(taskProfile);
}
@@ -74,8 +74,8 @@ public class TestTaskManager {
Assert.assertTrue("manager start error", false);
}
- TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, false,
0L, 0L, TaskStateEnum.RUNNING,
- "D", "GMT+8:00");
+ TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv",
false, 0L, 0L, TaskStateEnum.RUNNING,
+ "D", "GMT+8:00", null);
String taskId1 = taskProfile1.getTaskId();
taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles1 = new ArrayList<>();
@@ -99,8 +99,8 @@ public class TestTaskManager {
Assert.assertTrue(manager.getTaskProfile(taskId1).getState() ==
TaskStateEnum.RUNNING);
// test delete
- TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, false,
0L, 0L, TaskStateEnum.RUNNING,
- "D", "GMT+8:00");
+ TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv",
false, 0L, 0L, TaskStateEnum.RUNNING,
+ "D", "GMT+8:00", null);
taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles2 = new ArrayList<>();
taskProfiles2.add(taskProfile2);
diff --git
a/inlong-agent/agent-plugins/src/test/resources/test/mix_20230928_1.txt
b/inlong-agent/agent-plugins/src/test/resources/test/mix_20230928_1.txt
new file mode 100644
index 0000000000..0d136aa28e
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/test/mix_20230928_1.txt
@@ -0,0 +1,3 @@
+ok|hello line-end-symbol aa
+no|world line-end-symbol
+ok|agent line-end-symbol