This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1d7cfb74ce [INLONG-8180][Agent] Improve the efficiency and safety of
log file reading (#8182)
1d7cfb74ce is described below
commit 1d7cfb74ceb44219882b889ae736067bc4f33231
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Jun 14 09:45:19 2023 +0800
[INLONG-8180][Agent] Improve the efficiency and safety of log file reading
(#8182)
Co-authored-by: wenweihuang <[email protected]>
---
.../inlong/agent/constant/CommonConstants.java | 3 -
.../inlong/agent/message/PackProxyMessage.java | 19 +-
.../apache/inlong/agent/core/task/TaskManager.java | 1 +
.../agent/core/task/TaskPositionManager.java | 38 ++-
.../inlong/agent/plugin/sinks/KafkaSink.java | 3 -
.../inlong/agent/plugin/sinks/ProxySink.java | 13 +
.../inlong/agent/plugin/sinks/PulsarSink.java | 5 +-
.../inlong/agent/plugin/sinks/SenderManager.java | 3 -
.../sources/reader/file/FileReaderOperator.java | 278 ++++++++++-----------
.../sources/reader/file/MonitorTextFile.java | 23 +-
.../agent/plugin/sources/TestTextFileReader.java | 15 +-
.../inlong/agent/plugin/task/TestTextFileTask.java | 5 +-
12 files changed, 194 insertions(+), 212 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 84972767f9..81fbc7b202 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -58,9 +58,6 @@ public class CommonConstants {
// max size of message list
public static final String PROXY_PACKAGE_MAX_SIZE =
"proxy.package.maxSize";
- // determine if the send method is sync or async
- public static final String PROXY_SEND_SYNC = "proxy.sync";
-
// the same task must have the same Partition Key if choose sync
public static final String PROXY_SEND_PARTITION_KEY = "proxy.partitionKey";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
index 4f37de51a7..15d558748e 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
@@ -144,15 +144,12 @@ public class PackProxyMessage {
while (!messageQueue.isEmpty()) {
// pre check message size
ProxyMessage peekMessage = messageQueue.peek();
- if (peekMessage == null) {
- break;
- }
-
- // if the message size is greater than max pack size,should
drop it.
int peekMessageLength = peekMessage.getBody().length;
if (peekMessageLength > maxPackSize) {
LOGGER.warn("message size is {}, greater than max pack
size {}, drop it!",
peekMessage.getBody().length, maxPackSize);
+ int bodySize = peekMessage.getBody().length;
+ queueSize.addAndGet(-bodySize);
messageQueue.remove();
break;
}
@@ -160,13 +157,11 @@ public class PackProxyMessage {
break;
}
ProxyMessage message = messageQueue.remove();
- if (message != null) {
- int bodySize = message.getBody().length;
- resultBatchSize += bodySize;
- // decrease queue size.
- queueSize.addAndGet(-bodySize);
- result.add(message.getBody());
- }
+ int bodySize = message.getBody().length;
+ resultBatchSize += bodySize;
+ // decrease queue size.
+ queueSize.addAndGet(-bodySize);
+ result.add(message.getBody());
}
// make sure result is not empty.
if (!result.isEmpty()) {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 344cafba2e..1aa03e7d94 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -81,6 +81,7 @@ public class TaskManager extends AbstractDaemon {
this.taskMetrics = new
AgentMetricItemSet(this.getClass().getSimpleName());
this.dimensions = new HashMap<>();
this.dimensions.put(KEY_COMPONENT_NAME,
this.getClass().getSimpleName());
+ MetricRegister.unregister(taskMetrics);
MetricRegister.register(taskMetrics);
tasks = new ConcurrentHashMap<>();
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index 882578a880..85f94710d3 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -134,33 +134,29 @@ public class TaskPositionManager extends AbstractDaemon {
/**
* update job sink position
*
- * @param size add this size to beforePosition
+ * @param newPosition
*/
- public void updateSinkPosition(String jobInstanceId, String sourcePath,
long size, boolean reset) {
+ public void updateSinkPosition(String jobInstanceId, String sourcePath,
long newPosition) {
+ LOGGER.info("updateSinkPosition jobInstanceId {} sourcePath {}
newPosition {}", jobInstanceId, sourcePath,
+ newPosition);
ConcurrentHashMap<String, Long> positionTemp = new
ConcurrentHashMap<>();
- ConcurrentHashMap<String, Long> position =
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
- if (position == null) {
- JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
- if (jobProfile == null) {
- return;
- }
- positionTemp.put(sourcePath, jobProfile.getLong(sourcePath +
POSITION_SUFFIX, 0));
- position = positionTemp;
- }
-
- if (!reset) {
- Long beforePosition = position.getOrDefault(sourcePath, 0L);
- position.put(sourcePath, beforePosition + size);
+ ConcurrentHashMap<String, Long> lastPosition =
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
+ if (lastPosition == null) {
+ positionTemp.put(sourcePath, newPosition);
} else {
- position.put(sourcePath, size);
+ lastPosition.put(sourcePath, newPosition);
}
}
- public ConcurrentHashMap<String, Long> getTaskPositionMap(String jobId) {
- return jobTaskPositionMap.get(jobId);
- }
+ public long getPosition(String sourcePath, String jobInstanceId) {
+ JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
+ if (jobProfile == null) {
+ LOGGER.error("getPosition but jobProfile not exist! sourcePath {}
jobInstanceId {} return position 0",
+ sourcePath,
+ jobInstanceId);
+ return 0;
+ }
- public ConcurrentHashMap<String, ConcurrentHashMap<String, Long>>
getJobTaskPosition() {
- return jobTaskPositionMap;
+ return jobProfile.getLong(sourcePath + POSITION_SUFFIX, 0);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
index 0c1b3342fc..22f96d8eaf 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
@@ -255,9 +255,6 @@ public class KafkaSink extends AbstractSink {
batchMsg.getStreamId(), batchMsg.getDataTime(),
batchMsg.getMsgCnt(),
batchMsg.getTotalSize());
sinkMetric.pluginSendSuccessCount.addAndGet(batchMsg.getMsgCnt());
- if (sourceName != null) {
- taskPositionManager.updateSinkPosition(batchMsg.getJobId(),
sourceName, batchMsg.getMsgCnt(), false);
- }
}
private KafkaProducer<String, byte[]> selectProducer() {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 3f75fb748f..248796a9ec 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -41,6 +41,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
/**
* sink message data to inlong-dataproxy
@@ -56,12 +58,22 @@ public class ProxySink extends AbstractSink {
private SenderManager senderManager;
private byte[] fieldSplitter;
private volatile boolean shutdown = false;
+ private int maxPackSize;
public ProxySink() {
}
@Override
public void write(Message message) {
+ if (message == null) {
+ return;
+ }
+ // if the message size is greater than max pack size,should drop it.
+ if (message.getBody().length > maxPackSize) {
+ LOGGER.warn("message size is {}, greater than max pack size {},
drop it!",
+ message.getBody().length, maxPackSize);
+ return;
+ }
boolean suc = false;
while (!suc) {
suc = putInCache(message);
@@ -163,6 +175,7 @@ public class ProxySink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
+ this.maxPackSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
messageFilter = initMessageFilter(jobConf);
fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER,
DEFAULT_FIELD_SPLITTER).getBytes(
StandardCharsets.UTF_8);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index c3284c88d3..d2801e4b6d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -197,6 +197,7 @@ public class PulsarSink extends AbstractSink {
AgentUtils.silenceSleepInMs(batchFlushInterval);
}
shutdown = true;
+ EXECUTOR_SERVICE.shutdown();
if (CollectionUtils.isNotEmpty(pulsarSenders)) {
for (PulsarTopicSender sender : pulsarSenders) {
sender.close();
@@ -299,7 +300,6 @@ public class PulsarSink extends AbstractSink {
updateSuccessSendMetrics(batchMsg);
}
});
-
} else {
try {
producer.newMessage().eventTime(batchMsg.getDataTime()).value(message.buildArray()).send();
@@ -319,9 +319,6 @@ public class PulsarSink extends AbstractSink {
batchMsg.getStreamId(), batchMsg.getDataTime(),
batchMsg.getMsgCnt(),
batchMsg.getTotalSize());
sinkMetric.pluginSendSuccessCount.addAndGet(batchMsg.getMsgCnt());
- if (sourceName != null) {
- taskPositionManager.updateSinkPosition(batchMsg.getJobId(),
sourceName, batchMsg.getMsgCnt(), false);
- }
}
private Producer selectProducer() {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 84612d7460..615f4700bd 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -346,9 +346,6 @@ public class SenderManager {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId, dataTime, msgCnt,
batchMessage.getTotalSize());
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
- if (sourcePath != null) {
-
taskPositionManager.updateSinkPosition(batchMessage.getJobId(), sourcePath,
msgCnt, false);
- }
} else {
LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime
{} fail with times {}, "
+ "error {}", groupId, streamId, jobId, dataTime,
retry, result);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index b84898a650..82c3a40592 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -39,12 +39,10 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
@@ -60,15 +58,13 @@ import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
-import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_DEFAULT_STATUS;
@@ -91,13 +87,13 @@ public class FileReaderOperator extends AbstractReader {
public static final int NEVER_STOP_SIGN = -1;
public static final int BATCH_READ_SIZE = 10000;
public static final int CACHE_QUEUE_SIZE = 10 * BATCH_READ_SIZE;
- private static final int LINE_SEPARATOR_SIZE =
System.lineSeparator().getBytes(StandardCharsets.UTF_8).length;
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private static final SimpleDateFormat RECORD_TIME_FORMAT = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private static final Gson GSON = new Gson();
public File file;
- public int position = 0;
- public int bytePosition = 0;
+ public long position = 0;
+ public long bytePosition = 0;
private long readEndpoint = Long.MAX_VALUE;
public String md5;
public Map<String, String> metadata;
@@ -110,7 +106,8 @@ public class FileReaderOperator extends AbstractReader {
private long waitTimeout;
private long lastTime = 0;
private List<Validator> validators = new ArrayList<>();
- public boolean firstStored = false;
+ private static final byte[] inBuf = new byte[DEFAULT_BUFFER_SIZE];
+ private static int maxPackSize;
private final BlockingQueue<String> queue = new
LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
private final StringBuffer sb = new StringBuffer();
@@ -139,7 +136,6 @@ public class FileReaderOperator extends AbstractReader {
} catch (InterruptedException e) {
LOGGER.warn("poll {} data get interruptted.", file.getPath(), e);
}
-
return Optional.ofNullable(data)
.map(this::metadataMessage)
.filter(this::filterMessage)
@@ -225,6 +221,7 @@ public class FileReaderOperator extends AbstractReader {
this.jobConf = jobConf;
super.init(jobConf);
this.instanceId = jobConf.getInstanceId();
+ this.maxPackSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
initReadTimeout(jobConf);
String md5 = AgentUtils.getFileMd5(file);
if (StringUtils.isNotBlank(this.md5) && !this.md5.equals(md5)) {
@@ -237,21 +234,31 @@ public class FileReaderOperator extends AbstractReader {
.equals(JOB_FILE_MONITOR_DEFAULT_STATUS)) {
readEndpoint = Files.lines(file.toPath()).count();
}
-
- this.bytePosition = getStartBytePosition(this.position);
-
- isFirstStore();
-
- if (jobConf.hasKey(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE) &&
DataCollectType.INCREMENT
-
.equalsIgnoreCase(jobConf.get(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE))
- && this.firstStored) {
+ try {
+ position =
TaskPositionManager.getInstance().getPosition(getReadSource(), instanceId);
+ } catch (Exception ex) {
+ position = 0;
+ LOGGER.error("get position from position manager error, only
occur in ut: {}", ex.getMessage());
+ }
+ this.bytePosition = getStartBytePosition(position);
+ LOGGER.info("FileReaderOperator init file {} instanceId {} history
position {}", getReadSource(),
+ instanceId,
+ position);
+ if (isIncrement(jobConf)) {
LOGGER.info("FileReaderOperator DataCollectType INCREMENT:
start bytePosition {},{}",
file.length(), file.getAbsolutePath());
- this.bytePosition = (int) file.length();
-
- storeRocksDB();
+ this.bytePosition = file.length();
+ try (LineNumberReader lineNumberReader = new
LineNumberReader(new FileReader(file.getPath()))) {
+ lineNumberReader.skip(Long.MAX_VALUE);
+ position = lineNumberReader.getLineNumber();
+ TaskPositionManager.getInstance().updateSinkPosition(
+ getJobInstanceId(), getReadSource(), position);
+ LOGGER.info("for increment update {}, position to {}",
file.getAbsolutePath(), position);
+
+ } catch (IOException ex) {
+ LOGGER.error("get position error, file absolute path: {}",
file.getAbsolutePath());
+ }
}
-
try {
resiterMeta(jobConf);
} catch (Exception ex) {
@@ -263,6 +270,42 @@ public class FileReaderOperator extends AbstractReader {
}
}
+ private long getStartBytePosition(long lineNum) throws IOException {
+ long pos = 0;
+ long readCount = 0;
+ RandomAccessFile input = null;
+ try {
+ input = new RandomAccessFile(file, "r");
+ while (readCount < lineNum) {
+ List<String> lines = new ArrayList<>();
+ pos = readLines(input, pos, lines,
+ Math.min((int) (lineNum - readCount),
FileReaderOperator.BATCH_READ_SIZE));
+ readCount += lines.size();
+ if (lines.size() == 0) {
+ LOGGER.error("getStartBytePosition LineNum {} larger than
the real file");
+ break;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("getStartBytePosition error {}", e.getMessage());
+ } finally {
+ if (input != null) {
+ input.close();
+ }
+ }
+ LOGGER.info("getStartBytePosition LineNum {} position {}", lineNum,
pos);
+ return pos;
+ }
+
+ private boolean isIncrement(JobProfile jobConf) {
+ if (jobConf.hasKey(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE) &&
DataCollectType.INCREMENT
+
.equalsIgnoreCase(jobConf.get(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE))
+ && isFirstStore(jobConf)) {
+ return true;
+ }
+ return false;
+ }
+
// default value is -1 and never stop task
private void initReadTimeout(JobProfile jobConf) {
int waitTime = jobConf.getInt(JOB_FILE_MAX_WAIT,
@@ -326,97 +369,38 @@ public class FileReaderOperator extends AbstractReader {
// every line (include empty line) should be sent, otherwise the read
position will be offset when
// restarting and recovering. In the same time, Regex end line
spiltted line also has this problem, because
// recovering is based on line position.
- List<String> lines = bytePosition == 0 ? readFromLine(position) :
readFromPos(bytePosition);
+ List<String> lines = readFromPos(bytePosition);
if (!lines.isEmpty()) {
LOGGER.info("path is {}, line is {}, byte position is {}, reads
data lines {}",
file.getName(), position, bytePosition, lines.size());
}
List<String> resultLines = lines;
- // TODO line regular expression matching
- if (jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
- Pattern pattern =
Pattern.compile(jobConf.get(JOB_FILE_LINE_END_PATTERN));
- resultLines = lines.stream().flatMap(line -> {
- sb.append(line + System.lineSeparator());
- String data = sb.toString();
- Matcher matcher = pattern.matcher(data);
- List<String> tmpResultLines = new ArrayList<>();
- int beginPos = 0;
- while (matcher.find()) {
- String endLineStr = matcher.group();
- int endPos = data.indexOf(endLineStr, beginPos);
- tmpResultLines.add(data.substring(beginPos, endPos));
- beginPos = endPos + endLineStr.length();
- }
- String lastWord = data.substring(beginPos);
- sb.setLength(0);
- sb.append(lastWord);
- return tmpResultLines.stream();
- }).collect(Collectors.toList());
- }
-
resultLines.forEach(line -> {
try {
boolean offerSuc = queue.offer(line, 1, TimeUnit.SECONDS);
while (offerSuc != true) {
offerSuc = queue.offer(line, 1, TimeUnit.SECONDS);
}
+ LOGGER.debug("Read from file {} for {}", getReadSource(),
line);
} catch (InterruptedException e) {
LOGGER.error("fetchData offer failed {}", e.getMessage());
}
});
- bytePosition += lines.stream()
- .mapToInt(line -> line.getBytes(StandardCharsets.UTF_8).length
+ LINE_SEPARATOR_SIZE)
- .sum();
- position += lines.size();
if (position >= readEndpoint) {
finished = true;
}
}
- private List<String> readFromLine(int lineNum) throws IOException {
- String line = null;
- List<String> lines = new ArrayList<>();
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new InputStreamReader(new
FileInputStream(file),
- StandardCharsets.UTF_8));
- int count = 0;
- while ((line = reader.readLine()) != null) {
- if (++count > lineNum) {
- LOGGER.debug("read from line line-num {}, data {}",
lineNum, line);
- lines.add(line);
- }
- if (lines.size() >= FileReaderOperator.BATCH_READ_SIZE) {
- break;
- }
- }
- } catch (Exception e) {
- LOGGER.error("readFromLine error {}", e.getMessage());
- } finally {
- if (reader != null) {
- reader.close();
- }
- }
- return lines;
- }
-
- private List<String> readFromPos(int pos) throws IOException {
- String line = null;
+ private List<String> readFromPos(long pos) throws IOException {
List<String> lines = new ArrayList<>();
RandomAccessFile input = null;
try {
input = new RandomAccessFile(file, "r");
- input.skipBytes(pos);
- while ((line = input.readLine()) != null) {
- String lineChart = new
String(line.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8);
- LOGGER.debug("read from pos pos-num {}, data {}", pos,
lineChart);
- lines.add(lineChart);
- if (lines.size() >= FileReaderOperator.BATCH_READ_SIZE) {
- break;
- }
- }
+ bytePosition = readLines(input, pos, lines,
FileReaderOperator.BATCH_READ_SIZE);
+ position += lines.size();
+
TaskPositionManager.getInstance().updateSinkPosition(getJobInstanceId(),
getReadSource(), position);
} catch (Exception e) {
- LOGGER.error("readFromPos error {}", e.getMessage());
+ LOGGER.error("readFromPos error {}", e.getMessage());
} finally {
if (input != null) {
input.close();
@@ -425,67 +409,75 @@ public class FileReaderOperator extends AbstractReader {
return lines;
}
- private int getStartBytePosition(int lineNum) throws IOException {
- int startBytePosition = 0;
- BufferedReader reader = null;
- try {
- LOGGER.info("get start line {}", lineNum);
- String line = null;
- List<String> lines = new ArrayList<>();
- reader = new BufferedReader(new InputStreamReader(new
FileInputStream(file),
- StandardCharsets.UTF_8));
- int count = 0;
- while ((line = reader.readLine()) != null) {
- if (++count > lineNum) {
- LOGGER.info("get startBytePosition end at line {}", count);
+ /**
+ * Read new lines.
+ *
+ * @param reader The file to read
+ * @return The new position after the lines have been read
+ * @throws java.io.IOException if an I/O error occurs.
+ */
+ private static long readLines(RandomAccessFile reader, long pos,
List<String> lines, int maxLineCount)
+ throws IOException {
+ if (maxLineCount == 0) {
+ return pos;
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ reader.seek(pos);
+ long rePos = pos; // position to re-read
+ int num;
+ LOGGER.debug("readLines from {}", pos);
+ boolean overLen = false;
+ while ((num = reader.read(inBuf)) != -1) {
+ int i = 0;
+ for (; i < num; i++) {
+ byte ch = inBuf[i];
+ switch (ch) {
+ case '\n':
+ lines.add(new String(baos.toByteArray()));
+ rePos = pos + i + 1;
+ if (overLen) {
+ LOGGER.warn("readLines over len finally string len
{}",
+ new String(baos.toByteArray()).length());
+ }
+ baos.reset();
+ overLen = false;
+ break;
+ case '\r':
+ break;
+ default:
+ if (baos.size() < maxPackSize) {
+ baos.write(ch);
+ } else {
+ overLen = true;
+ }
+ }
+ if (lines.size() >= maxLineCount) {
break;
}
- startBytePosition = startBytePosition
- + line.getBytes(StandardCharsets.UTF_8).length +
LINE_SEPARATOR_SIZE;
}
- } catch (Exception e) {
- LOGGER.error("getStartPositon err {}", e);
- } finally {
- if (reader != null) {
- reader.close();
+ if (lines.size() >= maxLineCount) {
+ break;
+ }
+ if (i == num) {
+ pos = reader.getFilePointer();
}
}
- LOGGER.info("getStartPositon bytePosition {}", startBytePosition);
- return startBytePosition;
- }
-
- private void isFirstStore() {
- if (!jobConf.hasKey(JobConstants.JOB_STORE_TIME)) {
- LOGGER.info("isFirstStore {},{}", file.getAbsolutePath(),
this.firstStored);
- this.firstStored = true;
- return;
- }
- long jobStoreTime =
Long.parseLong(jobConf.get(JobConstants.JOB_STORE_TIME));
- LOGGER.info("jobStoreTime {},{}", file.getAbsolutePath(),
jobStoreTime);
- long storeTime = AgentConfiguration.getAgentConf().getLong(
- AgentConstants.AGENT_JOB_STORE_TIME,
AgentConstants.DEFAULT_JOB_STORE_TIME);
-
- if (System.currentTimeMillis() - jobStoreTime > storeTime) {
- this.firstStored = false;
- } else {
- this.firstStored = true;
- }
- LOGGER.info("isFirstStore {},{}", file.getAbsolutePath(),
this.firstStored);
+ baos.close();
+ reader.seek(rePos); // Ensure we can re-read if necessary
+ return rePos;
}
- private void storeRocksDB() {
- try (LineNumberReader lineNumberReader = new LineNumberReader(new
FileReader(file.getPath()))) {
- lineNumberReader.skip(Long.MAX_VALUE);
- int seekPosition = lineNumberReader.getLineNumber();
-
- String jobInstanceId = getJobInstanceId();
- if (jobInstanceId != null) {
- TaskPositionManager.getInstance().updateSinkPosition(
- jobInstanceId, getReadSource(), seekPosition, true);
- LOGGER.info("storeRocksDB {},{}", file.getAbsolutePath(),
seekPosition);
+ private boolean isFirstStore(JobProfile jobConf) {
+ boolean isFirst = true;
+ if (jobConf.hasKey(JobConstants.JOB_STORE_TIME)) {
+ long jobStoreTime =
Long.parseLong(jobConf.get(JobConstants.JOB_STORE_TIME));
+ long storeTime = AgentConfiguration.getAgentConf().getLong(
+ AgentConstants.AGENT_JOB_STORE_TIME,
AgentConstants.DEFAULT_JOB_STORE_TIME);
+ if (System.currentTimeMillis() - jobStoreTime > storeTime) {
+ isFirst = false;
}
- } catch (IOException ex) {
- LOGGER.error("get position error, file absolute path: {}",
file.getAbsolutePath());
}
+ LOGGER.info("is first store job {}, {}", file.getAbsolutePath(),
isFirst);
+ return isFirst;
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index 48c2215342..531b5d0821 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -43,9 +43,7 @@ import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INT
public final class MonitorTextFile {
private static final Logger LOGGER =
LoggerFactory.getLogger(MonitorTextFile.class);
- /**
- * monitor thread pool
- */
+ // monitor thread pool
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
@@ -136,12 +134,9 @@ public final class MonitorTextFile {
attributesAfter = Files.readAttributes(file.toPath(),
BasicFileAttributes.class);
currentPath = file.getCanonicalPath();
- if (attributesAfter.fileKey() == null) {
- return;
- }
-
// Determine whether the inode has changed
if (isInodeChanged(attributesAfter.fileKey().toString())) {
+ LOGGER.info("{} inode changed resetPosition",
fileReaderOperator.file.toPath());
resetPosition();
}
fileReaderOperator.fileKey =
attributesAfter.fileKey().toString();
@@ -154,6 +149,7 @@ public final class MonitorTextFile {
// if change symbolic links
if (attributesAfter.isSymbolicLink() && !path.equals(currentPath))
{
+ LOGGER.info("{} symbolicLink changed resetPosition",
fileReaderOperator.file.toPath());
resetPosition();
path = currentPath;
}
@@ -168,7 +164,7 @@ public final class MonitorTextFile {
}
/**
- * Reset the position and bytePosition
+ * reset the position and bytePosition
*/
private void resetPosition() {
LOGGER.info("reset position {}", fileReaderOperator.file.toPath());
@@ -178,22 +174,25 @@ public final class MonitorTextFile {
String jobInstanceId = fileReaderOperator.getJobInstanceId();
if (jobInstanceId != null) {
TaskPositionManager.getInstance().updateSinkPosition(
- jobInstanceId, fileReaderOperator.getReadSource(), 0,
true);
+ jobInstanceId, fileReaderOperator.getReadSource(), 0);
}
}
/**
* Determine whether the inode has changed
*
- * @param currentFileKey current file key
- * @return true if the inode changed, otherwise false
+ * @param currentFileKey
+ * @return
*/
private boolean isInodeChanged(String currentFileKey) {
if (fileReaderOperator.fileKey == null || currentFileKey == null) {
return false;
}
- return !fileReaderOperator.fileKey.equals(currentFileKey);
+ if (fileReaderOperator.fileKey.equals(currentFileKey)) {
+ return false;
+ }
+ return true;
}
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 3979388358..76f5338beb 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -21,7 +21,6 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.DataCollectType;
import org.apache.inlong.agent.constant.FileTriggerType;
import org.apache.inlong.agent.constant.MetadataConstants;
-import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Reader;
@@ -70,7 +69,8 @@ import static
org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES;
import static org.apache.inlong.agent.constant.MetadataConstants.ENV_CVM;
-@PowerMockIgnore({"javax.management.*", "javax.script.*",
"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*",
+@PowerMockIgnore({"javax.net.ssl.*", "javax.management.*", "javax.script.*",
"com.sun.org.apache.xerces.*",
+ "javax.xml.*", "org.xml.*",
"org.w3c.*"})
@PrepareForTest({MetricRegister.class})
public class TestTextFileReader {
@@ -87,7 +87,7 @@ public class TestTextFileReader {
}
@AfterClass
- public static void teardown() {
+ public static void teardown() throws Exception {
helper.teardownAgentHome();
}
@@ -196,11 +196,11 @@ public class TestTextFileReader {
break;
}
String content = getContent(message.toString());
+ LOGGER.info("content is {}", content);
Assert.assertTrue(
- content.equalsIgnoreCase("hello ")
- || content.equalsIgnoreCase(" aa" +
System.lineSeparator() + "world ")
- || content.equalsIgnoreCase(System.lineSeparator()
+ "agent "));
- LOGGER.info("message is {}", message.toString());
+ content.equalsIgnoreCase("hello line-end-symbol aa")
+ || content.equalsIgnoreCase("world
line-end-symbol")
+ || content.equalsIgnoreCase("agent
line-end-symbol"));
}
}
@@ -240,7 +240,6 @@ public class TestTextFileReader {
@Test
public void testTextSeekReader() throws Exception {
- final AgentManager agentManager = new AgentManager();
Path localPath = Paths.get(testDir.toString(), "test.txt");
LOGGER.info("start to create {}", localPath);
List<String> beforeList = new ArrayList<>();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
index c0fec85586..4c6ff071c9 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
@@ -73,7 +73,7 @@ import static
org.powermock.api.support.membermodification.MemberMatcher.field;
@RunWith(PowerMockRunner.class)
@PrepareForTest({TaskManager.class, MetricRegister.class})
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*"})
public class TestTextFileTask {
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
@@ -92,7 +92,6 @@ public class TestTextFileTask {
taskCache = new ArrayList<>();
TMP_FOLDER.create();
- // mock metrics
taskManager = new TaskManager(null);
agentMetricItemSet = mock(AgentMetricItemSet.class);
agentMetricItem = mock(AgentMetricItem.class);
@@ -106,7 +105,7 @@ public class TestTextFileTask {
}
@AfterClass
- public static void teardown() {
+ public static void teardown() throws Exception {
TMP_FOLDER.delete();
}