This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cf663fcfec [INLONG-9338][Agent] Real time file collection uses the
current time as the data time (#9339)
cf663fcfec is described below
commit cf663fcfece271616e9660f3fe2a215c78c6be0b
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Nov 27 17:15:33 2023 +0800
[INLONG-9338][Agent] Real time file collection uses the current time as the
data time (#9339)
---
.../inlong/agent/conf/AbstractConfiguration.java | 2 ++
.../message/filecollect/ProxyMessageCache.java | 18 +++++++++++++++---
.../inlong/agent/core/task/file/TaskManager.java | 2 +-
.../plugin/sinks/filecollect/SenderManager.java | 2 +-
.../inlong/agent/plugin/sources/LogFileSource.java | 21 ++++++++++++++++++---
5 files changed, 37 insertions(+), 8 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
index b6a8b19d73..ec7fcf1151 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
@@ -175,6 +175,7 @@ public abstract class AbstractConfiguration {
public int getInt(String key) {
JsonElement value = configStorage.get(key);
if (value == null) {
+ LOGGER.error("null value for key " + key);
throw new NullPointerException("null value for key " + key);
}
return value.getAsInt();
@@ -231,6 +232,7 @@ public abstract class AbstractConfiguration {
public String get(String key) {
JsonElement value = configStorage.get(key);
if (value == null) {
+ LOGGER.error("null value for key " + key);
throw new NullPointerException("null value for key " + key);
}
return value.getAsString();
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 66b3fb3b43..3c7742365e 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.message.filecollect;
import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.msg.AttributeConstants;
@@ -62,6 +63,7 @@ public class ProxyMessageCache {
private final AtomicLong cacheSize = new AtomicLong(0);
private long lastPrintTime = 0;
private long dataTime;
+ private boolean isRealTime = false;
/**
* extra map used when sending to dataproxy
*/
@@ -77,8 +79,12 @@ public class ProxyMessageCache {
this.cacheTimeout =
instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS,
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
messageQueueMap = new ConcurrentHashMap<>();
try {
- dataTime =
DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(),
- instanceProfile.get(TASK_CYCLE_UNIT));
+ String cycleUnit = instanceProfile.get(TASK_CYCLE_UNIT);
+ if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+ isRealTime = true;
+ cycleUnit = CycleUnitType.HOUR;
+ }
+ dataTime =
DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(),
cycleUnit);
} catch (ParseException e) {
LOGGER.info("trans dataTime error", e);
}
@@ -172,9 +178,15 @@ public class ProxyMessageCache {
offsetList.add(message.getAckInfo());
}
// make sure result is not empty.
+ long auditTime = 0;
+ if (isRealTime) {
+ auditTime = AgentUtils.getCurrentTime();
+ } else {
+ auditTime = dataTime;
+ }
if (!bodyList.isEmpty()) {
SenderMessage senderMessage = new SenderMessage(taskId,
instanceId, groupId, streamId, bodyList,
- dataTime, extraMap, offsetList);
+ auditTime, extraMap, offsetList);
return senderMessage;
}
return null;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 29fb5633f6..f97a89f26c 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -466,7 +466,7 @@ public class TaskManager extends AbstractDaemon {
task.getTaskId(), taskMap.size(),
runningPool.getTaskCount(),
runningPool.getActiveCount());
} catch (Throwable t) {
- LOGGER.error("add task error {}", t.getMessage());
+ LOGGER.error("add task error: ", t);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 807c6fbcde..45fe9bc63e 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -345,7 +345,7 @@ public class SenderManager {
message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId,
- profile.getSinkDataTime(), message.getMsgCnt(),
message.getTotalSize());
+ dataTime, message.getMsgCnt(), message.getTotalSize());
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {},
instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId,
dataTime, retry, result);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 481b7efcf9..4b61d32fc4 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.constant.DataCollectType;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.core.task.OffsetManager;
@@ -138,6 +139,7 @@ public class LogFileSource extends AbstractSource {
private long dataTime = 0;
private volatile long emptyCount = 0;
private ExtendedHandler extendedHandler;
+ private boolean isRealTime = false;
public LogFileSource() {
OffsetManager.init();
@@ -149,6 +151,11 @@ public class LogFileSource extends AbstractSource {
LOGGER.info("LogFileSource init: {}", profile.toJsonStr());
this.profile = profile;
super.init(profile);
+ String cycleUnit = profile.get(TASK_CYCLE_UNIT);
+ if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+ isRealTime = true;
+ cycleUnit = CycleUnitType.HOUR;
+ }
taskId = profile.getTaskId();
instanceId = profile.getInstanceId();
fileName = profile.getInstanceId();
@@ -161,8 +168,7 @@ public class LogFileSource extends AbstractSource {
linePosition = getInitLineOffset(isIncrement, taskId, instanceId,
inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
- dataTime =
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(),
- profile.get(TASK_CYCLE_UNIT));
+ dataTime =
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), cycleUnit);
if
(DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName())
!= 0) {
Constructor<?> constructor =
Class.forName(
@@ -369,8 +375,14 @@ public class LogFileSource extends AbstractSource {
if (extendedHandler != null) {
extendedHandler.dealWithHeader(header,
sourceData.getData().getBytes(StandardCharsets.UTF_8));
}
+ long auditTime = 0;
+ if (isRealTime) {
+ auditTime = AgentUtils.getCurrentTime();
+ } else {
+ auditTime = profile.getSinkDataTime();
+ }
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
- profile.getSinkDataTime(), 1, msgWithMetaData.length());
+ auditTime, 1, msgWithMetaData.length());
Message finalMsg = new
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {
@@ -556,6 +568,9 @@ public class LogFileSource extends AbstractSource {
@Override
public boolean sourceFinish() {
+ if (isRealTime) {
+ return false;
+ }
return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
}