This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e9256b583e [INLONG-9315][Agent] Convert data time from source time
zone to sink time zone (#9316)
e9256b583e is described below
commit e9256b583eae88c992cbec11b9df750641e090a5
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Nov 22 14:47:54 2023 +0800
[INLONG-9315][Agent] Convert data time from source time zone to sink time
zone (#9316)
---
.../org/apache/inlong/agent/core/AgentBaseTestsHelper.java | 11 +++++------
.../inlong/agent/core/instance/TestInstanceManager.java | 13 ++++++++++++-
.../org/apache/inlong/agent/core/task/TestTaskManager.java | 4 ++--
.../agent/plugin/sinks/filecollect/SenderManager.java | 2 +-
.../apache/inlong/agent/plugin/sources/LogFileSource.java | 2 +-
5 files changed, 21 insertions(+), 11 deletions(-)
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
index 73017c4215..8207dd9a99 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
@@ -73,14 +73,14 @@ public class AgentBaseTestsHelper {
}
public TaskProfile getTaskProfile(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
- TaskStateEnum state) {
- DataConfig dataConfig = getDataConfig(taskId, pattern, retry,
startTime, endTime, state);
+ TaskStateEnum state, String timeZone) {
+ DataConfig dataConfig = getDataConfig(taskId, pattern, retry,
startTime, endTime, state, timeZone);
TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
return profile;
}
private DataConfig getDataConfig(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
- TaskStateEnum state) {
+ TaskStateEnum state, String timeZone) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
dataConfig.setInlongStreamId("testStreamId");
@@ -90,9 +90,8 @@ public class AgentBaseTestsHelper {
dataConfig.setState(state.ordinal());
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);
- fileTaskConfig.setTimeOffset("0d");
- // GMT-8:00 same with Asia/Shanghai
- fileTaskConfig.setTimeZone("GMT-8:00");
+ fileTaskConfig.setTimeOffset("0h");
+ fileTaskConfig.setTimeZone(timeZone);
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("h");
fileTaskConfig.setRetry(retry);
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 0a87660c1f..910a32a46a 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.core.AgentBaseTestsHelper;
import org.apache.inlong.agent.core.task.file.TaskManager;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.enums.TaskStateEnum;
@@ -33,6 +34,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.ParseException;
+import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
@@ -49,7 +52,7 @@ public class TestInstanceManager {
helper = new
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
Db basicDb = TaskManager.initDb("/localdb");
- taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L,
TaskStateEnum.RUNNING);
+ taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L,
TaskStateEnum.RUNNING, "GMT+6:00");
manager = new InstanceManager("1", 2, basicDb);
manager.CORE_THREAD_SLEEP_TIME_MS = 100;
manager.start();
@@ -66,6 +69,14 @@ public class TestInstanceManager {
long timeBefore = AgentUtils.getCurrentTime();
InstanceProfile profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
helper.getTestRootDir() + "/2023092710_1.txt", "2023092710",
AgentUtils.getCurrentTime());
+ String sinkDataTime = String.valueOf(profile.getSinkDataTime());
+ try {
+ String add2TimeZone = String.valueOf(
+ DateTransUtils.timeStrConvertToMillSec("2023092712", "h",
TimeZone.getTimeZone("GMT+8:00")));
+ Assert.assertTrue(sinkDataTime, sinkDataTime.equals(add2TimeZone));
+ } catch (ParseException e) {
+ LOGGER.error("testInstanceManager error: ", e);
+ }
String instanceId = profile.getInstanceId();
InstanceAction action = new InstanceAction();
action.setActionType(ActionType.ADD);
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
index bf9047c40a..4fff284a80 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
@@ -61,7 +61,7 @@ public class TestTaskManager {
@Test
public void testTaskManager() {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false,
0L, 0L, TaskStateEnum.RUNNING);
+ TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false,
0L, 0L, TaskStateEnum.RUNNING, "GMT+8:00");
String taskId1 = taskProfile1.getTaskId();
taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles1 = new ArrayList<>();
@@ -83,7 +83,7 @@ public class TestTaskManager {
Assert.assertTrue(manager.getTaskProfile(taskId1).getState() ==
TaskStateEnum.RUNNING);
// test delete
- TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false,
0L, 0L, TaskStateEnum.RUNNING);
+ TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false,
0L, 0L, TaskStateEnum.RUNNING, "GMT+8:00");
taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles2 = new ArrayList<>();
taskProfiles2.add(taskProfile2);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 45fe9bc63e..807c6fbcde 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -345,7 +345,7 @@ public class SenderManager {
message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId,
- dataTime, message.getMsgCnt(), message.getTotalSize());
+ profile.getSinkDataTime(), message.getMsgCnt(),
message.getTotalSize());
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {},
instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId,
dataTime, retry, result);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index d713fb4e8a..481b7efcf9 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -370,7 +370,7 @@ public class LogFileSource extends AbstractSource {
extendedHandler.dealWithHeader(header,
sourceData.getData().getBytes(StandardCharsets.UTF_8));
}
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
- dataTime, 1, msgWithMetaData.length());
+ profile.getSinkDataTime(), 1, msgWithMetaData.length());
Message finalMsg = new
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {