aloyszhang commented on code in PR #9984:
URL: https://github.com/apache/inlong/pull/9984#discussion_r1565118361
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java:
##########
@@ -148,46 +67,77 @@ public LogFileSource() {
public void init(InstanceProfile profile) {
try {
LOGGER.info("LogFileSource init: {}", profile.toJsonStr());
- this.profile = profile;
super.init(profile);
- String cycleUnit = profile.get(TASK_CYCLE_UNIT);
- if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
- isRealTime = true;
- cycleUnit = CycleUnitType.HOUR;
- }
- taskId = profile.getTaskId();
- instanceId = profile.getInstanceId();
fileName = profile.getInstanceId();
- maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
isIncrement = isIncrement(profile);
file = new File(fileName);
inodeInfo = profile.get(TaskConstants.INODE_INFO);
lastInodeUpdateTime = AgentUtils.getCurrentTime();
linePosition = getInitLineOffset(isIncrement, taskId, instanceId,
inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
- queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
- dataTime =
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), cycleUnit);
- if
(DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName())
!= 0) {
- Constructor<?> constructor =
- Class.forName(
-
profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS,
DEFAULT_FILE_SOURCE_EXTEND_CLASS))
- .getDeclaredConstructor(InstanceProfile.class);
- constructor.setAccessible(true);
- extendedHandler = (ExtendedHandler)
constructor.newInstance(profile);
- }
- try {
- registerMeta(profile);
- } catch (Exception ex) {
- LOGGER.error("init metadata error", ex);
- }
- EXECUTOR_SERVICE.execute(run());
} catch (Exception ex) {
stopRunning();
throw new FileException("error init stream for " + file.getPath(),
ex);
}
}
+ @Override
+ protected boolean doPrepareToRead() {
+ if (isInodeChanged()) {
+ fileExist = false;
+ LOGGER.info("inode changed, instance will restart and offset will
be clean, file {}",
+ fileName);
+ return false;
+ }
+ if (file.length() < bytePosition) {
+ fileExist = false;
+ LOGGER.info("file rotate, instance will restart and offset will be
clean, file {}",
+ fileName);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ try {
+ return readFromPos(bytePosition);
+ } catch (FileNotFoundException e) {
+ fileExist = false;
+ LOGGER.error("readFromPos file deleted error: ", e);
+ } catch (IOException e) {
+ LOGGER.error("readFromPos error: ", e);
+ }
+ return null;
+ }
+
+ @Override
+ protected void printCurrentState() {
+ LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len
{}", file.getName(), linePosition,
+ bytePosition, file.length());
+ }
+
+ @Override
+ protected String getThreadName() {
+ return "log-file-source-" + taskId + "-" + fileName;
+ }
+
+ private List<SourceData> readFromPos(long pos) throws IOException {
+ List<byte[]> lines = new ArrayList<>();
+ List<SourceData> dataList = new ArrayList<>();
+ RandomAccessFile input = new RandomAccessFile(file, "r");
+ bytePosition = readLines(input, pos, lines, BATCH_READ_LINE_COUNT,
BATCH_READ_LINE_TOTAL_LEN, false);
+ for (int i = 0; i < lines.size(); i++) {
+ linePosition++;
+ dataList.add(new SourceData(lines.get(i), linePosition));
+ }
+ if (input != null) {
Review Comment:
input is always not null
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]