This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 13a34c8a87 [INLONG-9253][Agent] Fix bug: get byte postion of file by
line count offset failed (#9254)
13a34c8a87 is described below
commit 13a34c8a877da459431f65782c91133398459d37
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Nov 10 15:28:52 2023 +0800
[INLONG-9253][Agent] Fix bug: get byte postion of file by line count offset
failed (#9254)
---
.../inlong/agent/core/instance/MockInstance.java | 19 ++++++++++-----
.../agent/core/instance/TestInstanceManager.java | 4 ++--
.../inlong/agent/plugin/sources/LogFileSource.java | 27 +++++++++++-----------
.../agent/plugin/sources/TestLogFileSource.java | 2 +-
4 files changed, 30 insertions(+), 22 deletions(-)
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
index ada8cefcdd..5e9bbbab03 100644
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
@@ -20,13 +20,17 @@ package org.apache.inlong.agent.core.instance;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.plugin.Instance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
public class MockInstance extends Instance {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MockInstance.class);
public static final int INIT_TIME = 100;
- public static final int RUN_TIME = 101;
- public static final int DESTROY_TIME = 102;
private InstanceProfile profile;
- private long index = INIT_TIME;
+ private AtomicLong index = new AtomicLong(INIT_TIME);
public long initTime = 0;
public long destroyTime = 0;
public long runtime = 0;
@@ -36,12 +40,14 @@ public class MockInstance extends Instance {
public void init(Object instanceManager, InstanceProfile profile) {
this.instanceManager = (InstanceManager) instanceManager;
this.profile = profile;
- initTime = index++;
+ LOGGER.info("init called " + index);
+ initTime = index.getAndAdd(1);
}
@Override
public void destroy() {
- destroyTime = index++;
+ LOGGER.info("destroy called " + index);
+ destroyTime = index.getAndAdd(1);
}
@Override
@@ -66,7 +72,8 @@ public class MockInstance extends Instance {
@Override
public void run() {
- runtime = index++;
+ LOGGER.info("run called " + index);
+ runtime = index.getAndAdd(1);
}
public void sendFinishAction() {
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index cff9a7c243..aae1c15da9 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -100,7 +100,7 @@ public class TestInstanceManager {
manager.submitAction(action);
await().atMost(1, TimeUnit.SECONDS).until(() ->
manager.getInstanceProfile(instanceId) == null);
Assert.assertTrue(String.valueOf(instance.initTime), instance.initTime
== MockInstance.INIT_TIME);
- Assert.assertTrue(String.valueOf(instance.runtime), instance.runtime
== MockInstance.RUN_TIME);
- Assert.assertTrue(String.valueOf(instance.destroyTime),
instance.destroyTime == MockInstance.DESTROY_TIME);
+ Assert.assertTrue(instance.runtime > instance.initTime);
+ Assert.assertTrue(instance.destroyTime > instance.runtime);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index ab96b6a8e8..7b94e3f068 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -103,7 +103,7 @@ public class LogFileSource extends AbstractSource {
new AgentThreadFactory("log-file-source"));
private final Integer BATCH_READ_LINE_COUNT = 10000;
private final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
- private final Integer PRINT_INTERVAL_MS = 1000;
+ private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
private final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
private final Integer FINISH_READ_MAX_COUNT = 30;
@@ -117,8 +117,8 @@ public class LogFileSource extends AbstractSource {
private String fileName;
private File file;
private byte[] bufferToReadFile;
- public long linePosition = 0;
- public long bytePosition = 0;
+ public volatile long linePosition = 0;
+ public volatile long bytePosition = 0;
private boolean needMetadata = false;
public Map<String, String> metadata;
private boolean isIncrement = false;
@@ -128,7 +128,7 @@ public class LogFileSource extends AbstractSource {
private volatile int readEndCount = 0;
private volatile boolean fileExist = true;
private String inodeInfo;
- private long lastInodeUpdateTime = 0;
+ private volatile long lastInodeUpdateTime = 0;
private volatile boolean running = false;
public LogFileSource() {
@@ -144,20 +144,20 @@ public class LogFileSource extends AbstractSource {
instanceId = profile.getInstanceId();
fileName = profile.getInstanceId();
maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+ bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
isIncrement = isIncrement(profile);
file = new File(fileName);
inodeInfo = profile.get(TaskConstants.INODE_INFO);
lastInodeUpdateTime = AgentUtils.getCurrentTime();
linePosition = getInitLineOffset(isIncrement, taskId, instanceId,
inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
- bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
try {
registerMeta(profile);
} catch (Exception ex) {
LOGGER.error("init metadata error", ex);
}
- EXECUTOR_SERVICE.execute(run());
+ EXECUTOR_SERVICE.execute(coreThread());
} catch (Exception ex) {
stopRunning();
throw new FileException("error init stream for " + file.getPath(),
ex);
@@ -181,11 +181,12 @@ public class LogFileSource extends AbstractSource {
if (offsetProfile != null &&
offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
offset = offsetProfile.getOffset();
if (fileLineCount < offset) {
- LOGGER.info("getInitLineOffset taskId {} file rotate, offset
set to 0, file {}", taskId,
+ LOGGER.info("getInitLineOffset inode no change taskId {} file
rotate, offset set to 0, file {}", taskId,
fileName);
offset = 0;
} else {
- LOGGER.info("getInitLineOffset taskId {} from db {}, file {}",
taskId, offset, fileName);
+ LOGGER.info("getInitLineOffset inode no change taskId {} from
db {}, file {}", taskId, offset,
+ fileName);
}
} else {
if (isIncrement) {
@@ -247,7 +248,7 @@ public class LogFileSource extends AbstractSource {
}
}
} catch (Exception e) {
- LOGGER.error("getBytePositionByLine error {}", e.getMessage());
+ LOGGER.error("getBytePositionByLine error {}", e.getStackTrace());
} finally {
if (input != null) {
input.close();
@@ -397,7 +398,7 @@ public class LogFileSource extends AbstractSource {
return false;
}
- public Runnable run() {
+ public Runnable coreThread() {
return () -> {
AgentThreadFactory.nameThread("log-file-source-" + taskId + "-" +
file);
running = true;
@@ -441,10 +442,10 @@ public class LogFileSource extends AbstractSource {
AgentUtils.silenceSleepInSeconds(1);
} else {
readEndCount = 0;
- if (AgentUtils.getCurrentTime() - lastPrintTime >
PRINT_INTERVAL_MS) {
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
lastPrintTime = AgentUtils.getCurrentTime();
- LOGGER.info("path is {}, linePosition {}, bytePosition
is {}, reads lines size {}",
- file.getName(), linePosition, bytePosition,
lines.size());
+ LOGGER.info("path is {}, linePosition {}, bytePosition
is {} file len {}, reads lines size {}",
+ file.getName(), linePosition, bytePosition,
file.length(), lines.size());
}
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 5617976077..eb9c8ccf89 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -71,7 +71,7 @@ public class TestLogFileSource {
LogFileSource source = new LogFileSource();
Whitebox.setInternalState(source, "BATCH_READ_LINE_COUNT", 1);
Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10);
- Whitebox.setInternalState(source, "PRINT_INTERVAL_MS", 0);
+ Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS",
0);
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE",
2);
Whitebox.setInternalState(source, "FINISH_READ_MAX_COUNT", 1);
Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);