EMsnap commented on code in PR #5261:
URL: https://github.com/apache/inlong/pull/5261#discussion_r936207044


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java:
##########
@@ -170,14 +210,92 @@ public void init(JobProfile jobConf) {
             if (StringUtils.isNotBlank(this.md5) && !this.md5.equals(md5)) {
                 LOGGER.warn("md5 is differ from origin, origin: {}, new {}", 
this.md5, md5);
             }
+            localSnapshot = new 
LocalSnapshot(file.getPath().concat(POSITION_SUFFIX));
+            setSnapshot(jobConf);
             LOGGER.info("file name for task is {}, md5 is {}", file, md5);
-            stream = 
Files.newBufferedReader(file.toPath()).lines().skip(position);
-            iterator = stream.iterator();
+            //split line and column
+            getFileStream(jobConf);
+            if (Objects.nonNull(stream)) {
+                iterator = stream.iterator();
+            }
         } catch (Exception ex) {
             throw new FileException("error init stream for " + file.getPath(), 
ex);
         }
     }
 
+    private void getFileStream(JobProfile jobConf) throws IOException {
+        List<String> lines = 
Files.newBufferedReader(file.toPath()).lines().skip(position).collect(Collectors.toList());
+        List<String> resultLines = new ArrayList<>();
+        //TODO line regular expression matching
+        if (jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
+            Pattern pattern = 
Pattern.compile(jobConf.get(JOB_FILE_LINE_END_PATTERN));
+            lines.forEach(line -> {
+                lineStringBuffer.put(file,
+                        lineStringBuffer.isEmpty() ? line : 
lineStringBuffer.get(file).concat(" ").concat(line));
+                String data = lineStringBuffer.get(file);
+                Matcher matcher = pattern.matcher(data);
+                if (matcher.find() && 
StringUtils.isNoneBlank(matcher.group())) {
+                    String[] splitLines = data.split(matcher.group());
+                    int length = splitLines.length;
+                    for (int i = 0; i < length; i++) {
+                        if (i > 0 && i == length - 1 && null != splitLines[i]) 
{
+                            lineStringBuffer.put(file, splitLines[i]);
+                            break;
+                        }
+                        resultLines.add(splitLines[i].trim());
+                    }
+                    if (0 == length - 1) {
+                        lineStringBuffer.remove(file);
+                    }
+                }
+            });
+            if (resultLines.isEmpty()) {
+                return;
+            }
+        }
+        lines = resultLines.isEmpty() ? lines : resultLines;
+        stream = addAttributeData(jobConf, lines);
+    }
+
+    private Stream<String> addAttributeData(JobProfile jobConf, List<String> 
lines) {
+        if (!jobConf.hasKey(JOB_META_INFO_LIST)) {
+            return lines.stream();
+        }
+        lines = joinMetaAndLine(sourceMetas, lines);
+        return lines.stream();
+    }
+
+    private List<String> joinMetaAndLine(List<SourceMeta> sourceMetas, 
List<String> lines) {
+        if (Objects.isNull(sourceMetas)) {
+            return lines;
+        }
+        for (SourceMeta sm : sourceMetas) {
+            if (sm instanceof KubernetesMetadata) {
+                return joinMetaK8s((KubernetesMetadata) sm, lines);
+            }
+            if (sm instanceof AgentMetadata) {

Review Comment:
   k8s agent is one kind of agent, so here the two cannot be seperated 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to