This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new e7602a0 AMBARI-25293 - Logsearch: logfeeder throws NPE when updating
checkpoint (#2998)
e7602a0 is described below
commit e7602a087ef4ca759f6aed7b8f37e5ef3a20dcd5
Author: kasakrisz <[email protected]>
AuthorDate: Tue Jun 4 19:43:58 2019 +0200
AMBARI-25293 - Logsearch: logfeeder throws NPE when updating checkpoint
(#2998)
---
.../logfeeder/input/file/FileCheckInHelper.java | 65 ++++++++++++++++++----
.../input/file/ResumeLineNumberHelper.java | 22 +++-----
2 files changed, 61 insertions(+), 26 deletions(-)
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
index 7b8f0cd..f96c0d2 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
@@ -18,18 +18,18 @@
*/
package org.apache.ambari.logfeeder.input.file;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.ambari.logfeeder.input.InputFile;
import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import java.io.File;
-import java.io.RandomAccessFile;
-import java.util.Date;
-import java.util.Map;
-
public class FileCheckInHelper {
private static final Logger LOG = Logger.getLogger(FileCheckInHelper.class);
@@ -40,7 +40,14 @@ public class FileCheckInHelper {
public static void checkIn(InputFile inputFile, InputFileMarker inputMarker)
{
try {
Map<String, Object> jsonCheckPoint =
inputFile.getJsonCheckPoints().get(inputMarker.getBase64FileKey());
+ if (jsonCheckPoint == null) {
+ jsonCheckPoint = createNewCheckpointObject(inputFile);
+ attachCheckpointToInput(inputFile, jsonCheckPoint);
+ }
File checkPointFile =
inputFile.getCheckPointFiles().get(inputMarker.getBase64FileKey());
+ if (checkPointFile == null || !checkPointFile.exists()) {
+ checkPointFile =
FileCheckInHelper.attachCheckpointFileToInput(inputFile);
+ }
int lineNumber =
LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
if (lineNumber > inputMarker.getLineNumber()) {
@@ -80,15 +87,49 @@ public class FileCheckInHelper {
FileUtil.move(tmpCheckPointFile, checkPointFile);
if (inputFile.isClosed()) {
- String logMessageKey = inputFile.getClass().getSimpleName() +
"_FINAL_CHECKIN";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final
checkPoint, input=" + inputFile.getShortDescription() +
- ", checkPointFile=" + checkPointFile.getAbsolutePath() + ",
checkPoint=" + jsonStr, null, LOG, Level.INFO);
+ LOG.info(String.format("Wrote final checkPoint, input=%s,
checkPointFile=%s, checkPoint=%s", inputFile.getShortDescription(),
checkPointFile.getAbsolutePath(), jsonStr));
}
} catch (Throwable t) {
- String logMessageKey = inputFile.getClass().getSimpleName() +
"_CHECKIN_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception
checkIn. , input=" + inputFile.getShortDescription(), t,
- LOG, Level.ERROR);
+ LOG.error("Caught exception checkIn. , input=" +
inputFile.getShortDescription(), t);
+ }
+ }
+
+ /**
+ * Create new checkpoint object
+ * @param inputFile file object which is used to fill the checkpoint defaults
+ * @return Created checkpoint object
+ */
+ static Map<String, Object> createNewCheckpointObject(final InputFile
inputFile) {
+ Map<String, Object> jsonCheckPoint = new HashMap<>();
+ jsonCheckPoint.put("file_path", inputFile.getFilePath());
+ try {
+ jsonCheckPoint.put("file_key", inputFile.getBase64FileKey());
+ } catch (Exception e) {
+ LOG.error(String.format("Error during checkpoint object (path: %s)
creationg: %s", inputFile.getFilePath(), e.getMessage()));
}
+ return jsonCheckPoint;
+ }
+
+ /**
+ * Attach a json checkpoint object to an input file
+ * @param inputFile input file object that will have the new checkpoint
+ * @param jsonCheckPoint holds checkpoint related data
+ */
+ static void attachCheckpointToInput(final InputFile inputFile, final
Map<String, Object> jsonCheckPoint) throws Exception {
+ inputFile.getJsonCheckPoints().put(inputFile.getBase64FileKey(),
jsonCheckPoint);
+ }
+
+ /**
+ * Create a new file object for input checkpoint
+ * @param inputFile input file object that will have the new checkpoint file
+ * @return Newly created checkpoint file
+ */
+ static File attachCheckpointFileToInput(final InputFile inputFile) throws
Exception {
+ String checkPointFileName = inputFile.getBase64FileKey() +
inputFile.getCheckPointExtension();
+ File checkPointFolder =
inputFile.getInputManager().getCheckPointFolderFile();
+ File checkPointFile = new File(checkPointFolder, checkPointFileName);
+ inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(),
checkPointFile);
+ return checkPointFile;
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
index 9350200..614c3bc 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
@@ -18,17 +18,16 @@
*/
package org.apache.ambari.logfeeder.input.file;
-import org.apache.ambari.logfeeder.input.InputFile;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.EOFException;
import java.io.File;
import java.io.RandomAccessFile;
-import java.util.HashMap;
import java.util.Map;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class ResumeLineNumberHelper {
private static final Logger LOG =
LoggerFactory.getLogger(ResumeLineNumberHelper.class);
@@ -43,10 +42,7 @@ public class ResumeLineNumberHelper {
try {
LOG.info("Checking existing checkpoint file. " +
inputFile.getShortDescription());
- String checkPointFileName = inputFile.getBase64FileKey() +
inputFile.getCheckPointExtension();
- File checkPointFolder =
inputFile.getInputManager().getCheckPointFolderFile();
- checkPointFile = new File(checkPointFolder, checkPointFileName);
- inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(),
checkPointFile);
+ checkPointFile =
FileCheckInHelper.attachCheckpointFileToInput(inputFile);
Map<String, Object> jsonCheckPoint = null;
if (!checkPointFile.exists()) {
LOG.info("Checkpoint file for log file " + inputFile.getFilePath() + "
doesn't exist, starting to read it from the beginning");
@@ -74,12 +70,10 @@ public class ResumeLineNumberHelper {
}
if (jsonCheckPoint == null) {
// This seems to be first time, so creating the initial checkPoint
object
- jsonCheckPoint = new HashMap<String, Object>();
- jsonCheckPoint.put("file_path", inputFile.getFilePath());
- jsonCheckPoint.put("file_key", inputFile.getBase64FileKey());
+ FileCheckInHelper.createNewCheckpointObject(inputFile);
}
- inputFile.getJsonCheckPoints().put(inputFile.getBase64FileKey(),
jsonCheckPoint);
+ FileCheckInHelper.attachCheckpointToInput(inputFile, jsonCheckPoint);
} catch (Throwable t) {
LOG.error("Error while configuring checkpoint file. Will reset file.
checkPointFile=" + checkPointFile, t);