This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 11a3a5de5b [INLONG-9572][Agent] Set data time of message cache by sink
data time (#9574)
11a3a5de5b is described below
commit 11a3a5de5be2ac2daeb3894496ba4a85915c4417
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Jan 15 14:22:32 2024 +0800
[INLONG-9572][Agent] Set data time of message cache by sink data time
(#9574)
---
.../agent/message/filecollect/ProxyMessageCache.java | 15 +--------------
1 file changed, 1 insertion(+), 14 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 3c7742365e..c9b292817d 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -18,15 +18,12 @@
package org.apache.inlong.agent.message.filecollect;
import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.msg.AttributeConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -42,7 +39,6 @@ import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PAC
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
/**
* Handle List of Proxy Message, which belong to the same stream id.
@@ -78,16 +74,7 @@ public class ProxyMessageCache {
DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
this.cacheTimeout =
instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS,
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
messageQueueMap = new ConcurrentHashMap<>();
- try {
- String cycleUnit = instanceProfile.get(TASK_CYCLE_UNIT);
- if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
- isRealTime = true;
- cycleUnit = CycleUnitType.HOUR;
- }
- dataTime =
DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(),
cycleUnit);
- } catch (ParseException e) {
- LOGGER.info("trans dataTime error", e);
- }
+ dataTime = instanceProfile.getSinkDataTime();
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
}