This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4f533e0b3d [INLONG-11527][Agent] Save both row and byte position
information when saving offset (#11528)
4f533e0b3d is described below
commit 4f533e0b3d819651bd1733ea7572afb793bf0f78
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Nov 21 21:04:18 2024 +0800
[INLONG-11527][Agent] Save both row and byte position information when
saving offset (#11528)
---
.../inlong/agent/plugin/sources/LogFileSource.java | 101 +++++++++++++--------
.../agent/plugin/sources/TestLogFileSource.java | 36 +++++---
2 files changed, 88 insertions(+), 49 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 5aebbc7d86..72b0cd704a 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -31,6 +31,9 @@ import
org.apache.inlong.agent.plugin.sources.file.extend.DefaultExtendedHandler
import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +55,20 @@ import static
org.apache.inlong.agent.constant.TaskConstants.SOURCE_DATA_CONTENT
*/
public class LogFileSource extends AbstractSource {
+ public static final int LEN_OF_FILE_OFFSET_ARRAY = 2;
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ protected class FileOffset {
+
+ private Long lineOffset;
+ private Long byteOffset;
+ private boolean hasByteOffset;
+ }
+
private static final Logger LOGGER =
LoggerFactory.getLogger(LogFileSource.class);
+ public static final String OFFSET_SEP = ":";
private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss"); // 设置格式
@@ -86,8 +102,7 @@ public class LogFileSource extends AbstractSource {
file = new File(fileName);
inodeInfo = profile.get(TaskConstants.INODE_INFO);
lastInodeUpdateTime = AgentUtils.getCurrentTime();
- linePosition = getInitLineOffset(isIncrement, taskId, instanceId,
inodeInfo);
- bytePosition = getBytePositionByLine(linePosition);
+ initOffset(isIncrement, taskId, instanceId, inodeInfo);
randomAccessFile = new RandomAccessFile(file, "r");
} catch (Exception ex) {
stopRunning();
@@ -137,14 +152,9 @@ public class LogFileSource extends AbstractSource {
}
private List<SourceData> readFromPos(long pos) throws IOException {
- List<byte[]> lines = new ArrayList<>();
- List<SourceData> dataList = new ArrayList<>();
- bytePosition = readLines(randomAccessFile, pos, lines,
BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false);
- for (int i = 0; i < lines.size(); i++) {
- linePosition++;
- dataList.add(new SourceData(lines.get(i),
Long.toString(linePosition)));
- }
- return dataList;
+ List<SourceData> lines = new ArrayList<>();
+ bytePosition = readLines(randomAccessFile, pos, lines,
BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN);
+ return lines;
}
private int getRealLineCount(String fileName) {
@@ -157,30 +167,39 @@ public class LogFileSource extends AbstractSource {
}
}
- private long getInitLineOffset(boolean isIncrement, String taskId, String
instanceId, String inodeInfo) {
- long offset = 0;
+ private void initOffset(boolean isIncrement, String taskId, String
instanceId, String inodeInfo)
+ throws IOException {
+ long lineOffset;
+ long byteOffset;
if (offsetProfile != null &&
offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
- offset = Long.parseLong(offsetProfile.getOffset());
- int fileLineCount = getRealLineCount(instanceId);
- if (fileLineCount < offset) {
- LOGGER.info("getInitLineOffset inode no change taskId {} file
rotate, offset set to 0, file {}", taskId,
- fileName);
- offset = 0;
+ FileOffset fileOffset = parseFIleOffset(offsetProfile.getOffset());
+ if (fileOffset.hasByteOffset) {
+ lineOffset = fileOffset.lineOffset;
+ byteOffset = fileOffset.byteOffset;
+ LOGGER.info("initOffset inode no change taskId {} restore
lineOffset {} byteOffset {}, file {}", taskId,
+ lineOffset, byteOffset, fileName);
} else {
- LOGGER.info("getInitLineOffset inode no change taskId {} from
offset store {}, file {}", taskId, offset,
- fileName);
+ lineOffset = fileOffset.lineOffset;
+ byteOffset = getBytePositionByLine(lineOffset);
+ LOGGER.info("initOffset inode no change taskId {} restore
lineOffset {} count byteOffset {}, file {}",
+ taskId,
+ lineOffset, byteOffset, fileName);
}
} else {
if (isIncrement) {
- offset = getRealLineCount(instanceId);
- LOGGER.info("getInitLineOffset taskId {} for new increment
read from {} file {}", taskId,
- offset, fileName);
+ lineOffset = getRealLineCount(instanceId);
+ byteOffset = getBytePositionByLine(lineOffset);
+ LOGGER.info("initOffset taskId {} for new increment lineOffset
{} byteOffset {}, file {}", taskId,
+ lineOffset, byteOffset, fileName);
} else {
- offset = 0;
- LOGGER.info("getInitLineOffset taskId {} for new all read from
0 file {}", taskId, fileName);
+ lineOffset = 0;
+ byteOffset = 0;
+ LOGGER.info("initOffset taskId {} for new all read lineOffset
{} byteOffset {} file {}", taskId,
+ lineOffset, byteOffset, fileName);
}
}
- return offset;
+ linePosition = lineOffset;
+ bytePosition = byteOffset;
}
public File getFile() {
@@ -202,9 +221,9 @@ public class LogFileSource extends AbstractSource {
try {
input = new RandomAccessFile(file, "r");
while (readCount < linePosition) {
- List<byte[]> lines = new ArrayList<>();
+ List<SourceData> lines = new ArrayList<>();
pos = readLines(input, pos, lines, Math.min((int)
(linePosition - readCount), BATCH_READ_LINE_COUNT),
- BATCH_READ_LINE_TOTAL_LEN, true);
+ BATCH_READ_LINE_TOTAL_LEN);
readCount += lines.size();
if (lines.size() == 0) {
LOGGER.error("getBytePositionByLine LineNum {} larger than
the real file");
@@ -229,8 +248,8 @@ public class LogFileSource extends AbstractSource {
* @return The new position after the lines have been read
* @throws IOException if an I/O error occurs.
*/
- private long readLines(RandomAccessFile reader, long pos, List<byte[]>
lines, int maxLineCount, int maxLineTotalLen,
- boolean isCounting)
+ private long readLines(RandomAccessFile reader, long pos, List<SourceData>
lines, int maxLineCount,
+ int maxLineTotalLen)
throws IOException {
if (maxLineCount == 0) {
return pos;
@@ -248,13 +267,10 @@ public class LogFileSource extends AbstractSource {
byte ch = bufferToReadFile[i];
switch (ch) {
case '\n':
- if (isCounting) {
- lines.add(null);
- } else {
- lines.add(baos.toByteArray());
- lineTotalLen += baos.size();
- }
+ linePosition++;
rePos = pos + i + 1;
+ lines.add(new SourceData(baos.toByteArray(),
getOffsetString(linePosition, rePos)));
+ lineTotalLen += baos.size();
if (overLen) {
LOGGER.warn("readLines over len finally string len
{}",
new String(baos.toByteArray()).length());
@@ -297,6 +313,19 @@ public class LogFileSource extends AbstractSource {
return rePos;
}
+ private String getOffsetString(Long lineOffset, Long byteOffset) {
+ return lineOffset + OFFSET_SEP + byteOffset;
+ }
+
+ private FileOffset parseFIleOffset(String offset) {
+ String[] offsetArray = offset.split(OFFSET_SEP);
+ if (offsetArray.length == LEN_OF_FILE_OFFSET_ARRAY) {
+ return new FileOffset(Long.parseLong(offsetArray[0]),
Long.parseLong(offsetArray[1]), true);
+ } else {
+ return new FileOffset(Long.parseLong(offsetArray[0]), null, false);
+ }
+ }
+
private boolean isInodeChanged() {
if (AgentUtils.getCurrentTime() - lastInodeUpdateTime >
INODE_UPDATE_INTERVAL_MS) {
try {
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 5d6871fecb..408b9f1b70 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
@@ -74,13 +75,18 @@ public class TestLogFileSource {
OffsetManager.init(taskBasicStore, instanceBasicStore,
offsetBasicStore);
}
- private LogFileSource getSource(int taskId, long offset) {
+ private LogFileSource getSource(int taskId, long lineOffset, long
byteOffset, String dataContentStyle,
+ boolean isOffSetNew) {
try {
- String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
"csv", false, "", "",
+ String pattern;
+ String fileName;
+ boolean retry;
+ fileName = LOADER.getResource("test/20230928_1.txt").getPath();
+ pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
+ retry = false;
+ TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
dataContentStyle, retry, "", "",
TaskStateEnum.RUNNING, "D",
- "GMT+8:00", null);
- String fileName =
LOADER.getResource("test/20230928_1.txt").getPath();
+ "GMT+8:00", Arrays.asList("ok"));
InstanceProfile instanceProfile =
taskProfile.createInstanceProfile("",
fileName, taskProfile.getCycleUnit(), "20230928",
AgentUtils.getCurrentTime());
instanceProfile.set(TaskConstants.INODE_INFO,
FileDataUtils.getInodeInfo(instanceProfile.getInstanceId()));
@@ -91,17 +97,21 @@ public class TestLogFileSource {
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE",
2);
Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
- if (offset > 0) {
+ if (lineOffset > 0) {
+ String finalOffset = Long.toString(lineOffset);
+ if (isOffSetNew) {
+ finalOffset += LogFileSource.OFFSET_SEP + byteOffset;
+ }
OffsetProfile offsetProfile = new
OffsetProfile(instanceProfile.getTaskId(),
instanceProfile.getInstanceId(),
- Long.toString(offset),
instanceProfile.get(INODE_INFO));
+ finalOffset, instanceProfile.get(INODE_INFO));
OffsetManager.getInstance().setOffset(offsetProfile);
}
source.init(instanceProfile);
source.start();
return source;
} catch (Exception e) {
- LOGGER.error("source init error {}", e);
+ LOGGER.error("source init error", e);
Assert.assertTrue("source init error", false);
}
return null;
@@ -124,7 +134,7 @@ public class TestLogFileSource {
for (int i = 0; i < check.length; i++) {
srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
}
- LogFileSource source = getSource(1, 0);
+ LogFileSource source = getSource(1, 0, 0, "csv", false);
Message msg = source.read();
int readLen = 0;
int cnt = 0;
@@ -149,7 +159,7 @@ public class TestLogFileSource {
}
private void testCleanQueue() {
- LogFileSource source = getSource(2, 0);
+ LogFileSource source = getSource(2, 0, 0, "csv", false);
for (int i = 0; i < 2; i++) {
source.read();
}
@@ -160,16 +170,16 @@ public class TestLogFileSource {
}
private void testReadWithOffset() {
- LogFileSource source = getSource(3, 1);
+ LogFileSource source = getSource(3, 1, 25, "csv", false);
for (int i = 0; i < 2; i++) {
Message msg = source.read();
- Assert.assertTrue(msg != null);
+ Assert.assertEquals(new String(msg.getBody()), check[i + 1]);
}
Message msg = source.read();
Assert.assertTrue(msg == null);
source.destroy();
- source = getSource(4, 3);
+ source = getSource(4, 3, 69, "csv", false);
msg = source.read();
Assert.assertTrue(msg == null);
source.destroy();