This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 85a0e32  AMBARI-24355. Logfeeder: create CLI for checkpoints and 
include log type in checkpoint file names. (#2245)
85a0e32 is described below

commit 85a0e3203034aeba9ad5923d068382e160701355
Author: Olivér Szabó <[email protected]>
AuthorDate: Wed Sep 5 13:03:06 2018 +0200

    AMBARI-24355. Logfeeder: create CLI for checkpoints and include log type in 
checkpoint file names. (#2245)
    
    * AMBARI-24355. Logfeeder: include log type in checkpoint file names.
    
    * AMBARI-24355. Add CLI for handling checkpoints.
---
 .../{InputManager.java => CheckpointManager.java}  |  31 ++-
 .../logfeeder/plugin/manager/InputManager.java     |   6 +-
 .../ambari/logfeeder/LogFeederCommandLine.java     |  82 +++++++-
 .../ambari/logfeeder/common/ConfigHandler.java     |   2 +-
 .../ambari/logfeeder/conf/ApplicationConfig.java   |   9 +-
 .../apache/ambari/logfeeder/input/InputFile.java   |   8 +-
 .../ambari/logfeeder/input/InputManagerImpl.java   | 182 ++----------------
 .../ambari/logfeeder/input/InputSimulate.java      |  10 +-
 .../file/checkpoint/FileCheckpointManager.java     | 208 +++++++++++++++++++++
 .../file/checkpoint/util/CheckpointFileReader.java |  64 +++++++
 .../{ => checkpoint/util}/FileCheckInHelper.java   |   2 +-
 .../util/FileCheckpointCleanupHelper.java          | 132 +++++++++++++
 .../util}/ResumeLineNumberHelper.java              |  12 +-
 .../input/monitor/CheckpointCleanupMonitor.java    |  10 +-
 .../src/main/scripts/logfeeder.sh                  |  17 ++
 .../ambari/logfeeder/input/InputFileTest.java      |  14 +-
 16 files changed, 575 insertions(+), 214 deletions(-)

diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/CheckpointManager.java
similarity index 56%
copy from 
ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
copy to 
ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/CheckpointManager.java
index 0734158..abf1465 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/CheckpointManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,29 +18,26 @@
  */
 package org.apache.ambari.logfeeder.plugin.manager;
 
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
 import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 
-import java.io.File;
-import java.util.List;
+import java.io.IOException;
 
+public interface CheckpointManager<I extends Input, IFM extends InputMarker, P 
extends LogFeederProperties> {
 
-public abstract class InputManager implements BlockManager {
+  void init(P properties);
 
-  public abstract void addToNotReady(Input input);
+  void checkIn(I inputFile, IFM inputMarker);
 
-  public abstract void checkInAll();
+  int resumeLineNumber(I input);
 
-  public abstract List<Input> getInputList(String serviceName);
+  void cleanupCheckpoints();
 
-  public abstract void add(String serviceName, Input input);
+  void printCheckpoints(String checkpointLocation, String logTypeFilter,
+                        String fileKeyFilter) throws IOException;
 
-  public abstract void removeInput(Input input);
+  void cleanCheckpoint(String checkpointLocation, String logTypeFilter,
+                       String fileKeyFilter, boolean all) throws IOException;
 
-  public abstract File getCheckPointFolderFile();
-
-  public abstract void cleanCheckPointFiles();
-
-  public abstract void removeInputsForService(String serviceName);
-
-  public abstract void startInputs(String serviceName);
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
index 0734158..6dc1423 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
@@ -36,11 +36,9 @@ public abstract class InputManager implements BlockManager {
 
   public abstract void removeInput(Input input);
 
-  public abstract File getCheckPointFolderFile();
-
-  public abstract void cleanCheckPointFiles();
-
   public abstract void removeInputsForService(String serviceName);
 
   public abstract void startInputs(String serviceName);
+
+  public abstract CheckpointManager getCheckpointHandler();
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
index a6fcb2a..3812ed1 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
@@ -21,6 +21,8 @@ package org.apache.ambari.logfeeder;
 
 import com.google.gson.GsonBuilder;
 import org.apache.ambari.logfeeder.common.LogEntryParseTester;
+import org.apache.ambari.logfeeder.input.file.checkpoint.FileCheckpointManager;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -36,16 +38,24 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 public class LogFeederCommandLine {
   
   private static final String TEST_COMMAND = "test";
+  private static final String CHECKPOINTS_COMMAND = "checkpoints";
+  private static final String CHECKPOINTS_FOLDER_OPTION = "checkpoints-folder";
+  private static final String CHECKPOINTS_LIST_OPTION = "list";
+  private static final String CHECKPOINTS_CLEAN_OPTION = "clean";
+  private static final String CHECKPOINTS_ALL_OPTION = "all";
+  private static final String CHECKPOINTS_FILE_KEY = "file-key";
+  private static final String CHECKPOINTS_LOG_TYPE = "log-type";
   private static final String TEST_LOG_ENTRY_OPTION = "test-log-entry";
   private static final String TEST_SHIPPER_CONFIG_OPTION = 
"test-shipper-config";
   private static final String TEST_GLOBAL_CONFIG_OPTION = "test-global-config";
   private static final String TEST_LOG_ID_OPTION = "test-log-id";
   
-  private static final String COMMAND_LINE_SYNTAX = "java 
org.apache.ambari.logfeeder.LogFeederCommandLine --test [args]";
+  private static final String COMMAND_LINE_SYNTAX = "java 
org.apache.ambari.logfeeder.LogFeederCommandLine [args]";
 
   public static void main(String[] args) {
     Options options = new Options();
@@ -63,6 +73,44 @@ public class LogFeederCommandLine {
       .desc("Test if log entry is parseable")
       .build();
 
+    Option checkpointsOption = Option.builder("cp")
+      .longOpt(CHECKPOINTS_COMMAND)
+      .desc("Use checkpoint operations")
+      .build();
+
+    Option checkpointsListOption = Option.builder("l")
+      .longOpt(CHECKPOINTS_LIST_OPTION)
+      .desc("Print checkpoints")
+      .build();
+
+    Option checkpointsCleanOption = Option.builder("c")
+      .longOpt(CHECKPOINTS_CLEAN_OPTION)
+      .desc("Remove a checkpoint file (by key/log type or use on all)")
+      .build();
+
+    Option checkpointsFolderOption = Option.builder("cf")
+      .longOpt(CHECKPOINTS_FOLDER_OPTION)
+      .hasArg()
+      .desc("Checkpoints folder location")
+      .build();
+
+    Option checkpointsFileKeyOption = Option.builder("k")
+      .longOpt(CHECKPOINTS_FILE_KEY)
+      .hasArg()
+      .desc("Filter on file key (for list and clean)")
+      .build();
+
+    Option checkpointsLogTypeOption = Option.builder("lt")
+      .longOpt(CHECKPOINTS_LOG_TYPE)
+      .hasArg()
+      .desc("Filter on log type (for list and clean)")
+      .build();
+
+    Option checkpointAllOption = Option.builder("a")
+      .longOpt(CHECKPOINTS_ALL_OPTION)
+      .desc("")
+      .build();
+
     Option testLogEntryOption = Option.builder("tle")
       .longOpt(TEST_LOG_ENTRY_OPTION)
       .hasArg()
@@ -93,6 +141,13 @@ public class LogFeederCommandLine {
     options.addOption(testShipperConfOption);
     options.addOption(testGlobalConfOption);
     options.addOption(testLogIdOption);
+    options.addOption(checkpointsOption);
+    options.addOption(checkpointsListOption);
+    options.addOption(checkpointsCleanOption);
+    options.addOption(checkpointsFolderOption);
+    options.addOption(checkpointAllOption);
+    options.addOption(checkpointsFileKeyOption);
+    options.addOption(checkpointsLogTypeOption);
 
     try {
       CommandLineParser cmdLineParser = new DefaultParser();
@@ -103,6 +158,31 @@ public class LogFeederCommandLine {
         System.exit(0);
       }
       String command = "";
+      if (cli.hasOption("cp")) {
+        String checkpointLocation = "";
+        if (cli.hasOption("cf")) {
+          checkpointLocation = cli.getOptionValue("cf");
+        } else {
+          Properties prop = new Properties();
+          
prop.load(LogFeederCommandLine.class.getClassLoader().getResourceAsStream("logfeeder.properties"));
+          checkpointLocation = prop.getProperty("logfeeder.checkpoint.folder");
+        }
+        boolean cleanCommand = cli.hasOption("c");
+        boolean listCommand = cli.hasOption("l") || !cleanCommand; // Use list 
if clean is not used
+        boolean allOption = cli.hasOption("a");
+        String logTypeFilter = cli.hasOption("lt") ? cli.getOptionValue("lt") 
: null;
+        String fileKeyFilter = cli.hasOption("k") ? cli.getOptionValue("k") : 
null;
+
+        final CheckpointManager checkpointManager = new 
FileCheckpointManager();
+        if (listCommand) {
+          checkpointManager.printCheckpoints(checkpointLocation, 
logTypeFilter, fileKeyFilter);
+        } else {
+          checkpointManager.cleanCheckpoint(checkpointLocation, logTypeFilter, 
fileKeyFilter, allOption);
+        }
+
+        System.out.println("Checkpoint operation has finished successfully.");
+        return;
+      }
       if (cli.hasOption("t")) {
         command = TEST_COMMAND;
         validateRequiredOptions(cli, command, testLogEntryOption, 
testShipperConfOption);
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 8dbb39a..1ceef3b 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -421,7 +421,7 @@ public class ConfigHandler implements InputConfigMonitor {
   }
 
   public void cleanCheckPointFiles() {
-    inputManager.cleanCheckPointFiles();
+    inputManager.getCheckpointHandler().cleanupCheckpoints();
   }
 
   public void logStats() {
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index f54fc0c..8c7e7d9 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -24,6 +24,8 @@ import 
org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.input.InputConfigUploader;
 import org.apache.ambari.logfeeder.input.InputManagerImpl;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
+import org.apache.ambari.logfeeder.input.file.checkpoint.FileCheckpointManager;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
 import org.apache.ambari.logfeeder.common.ConfigHandler;
 import org.apache.ambari.logfeeder.metrics.MetricsManager;
@@ -148,7 +150,7 @@ public class ApplicationConfig {
 
 
   @Bean
-  @DependsOn("containerRegistry")
+  @DependsOn({"containerRegistry", "checkpointHandler"})
   public InputManager inputManager() {
     return new InputManagerImpl();
   }
@@ -159,6 +161,11 @@ public class ApplicationConfig {
   }
 
   @Bean
+  public CheckpointManager checkpointHandler() {
+    return new FileCheckpointManager();
+  }
+
+  @Bean
   public DockerContainerRegistry containerRegistry() {
     if (logFeederProps.isDockerContainerRegistryEnabled()) {
       return 
DockerContainerRegistry.getInstance(logFeederProps.getProperties());
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 441ce3e..85df2ee 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -26,9 +26,7 @@ import 
org.apache.ambari.logfeeder.input.monitor.DockerLogFileUpdateMonitor;
 import org.apache.ambari.logfeeder.input.monitor.LogFileDetachMonitor;
 import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor;
 import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
-import org.apache.ambari.logfeeder.input.file.FileCheckInHelper;
 import org.apache.ambari.logfeeder.input.file.ProcessFileHelper;
-import org.apache.ambari.logfeeder.input.file.ResumeLineNumberHelper;
 import org.apache.ambari.logfeeder.plugin.filter.Filter;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.util.FileUtil;
@@ -144,7 +142,7 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
 
   @Override
   public synchronized void checkIn(InputFileMarker inputMarker) {
-    FileCheckInHelper.checkIn(this, inputMarker);
+    getInputManager().getCheckpointHandler().checkIn(this, inputMarker);
   }
 
   @Override
@@ -304,7 +302,7 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
   }
 
   public int getResumeFromLineNumber() {
-    return ResumeLineNumberHelper.getResumeFromLineNumber(this);
+    return 
this.getInputManager().getCheckpointHandler().resumeLineNumber(this);
   }
 
   public void processFile(File logPathFile, boolean follow) throws Exception {
@@ -485,7 +483,7 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
     return fileKey;
   }
 
-  public String getBase64FileKey() throws Exception {
+  public String getBase64FileKey() {
     return base64FileKey;
   }
 
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
index ea97968..a256fd7 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
@@ -22,23 +22,14 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
 import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
 import org.apache.ambari.logfeeder.docker.DockerContainerRegistryMonitor;
-import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.manager.InputManager;
-import org.apache.ambari.logfeeder.util.FileUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.solr.common.util.Base64;
 
 import javax.inject.Inject;
-import java.io.EOFException;
 import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -46,22 +37,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 public class InputManagerImpl extends InputManager {
 
   private static final Logger LOG = Logger.getLogger(InputManagerImpl.class);
 
-  private static final String CHECKPOINT_SUBFOLDER_NAME = 
"logfeeder_checkpoints";
-
   private Map<String, List<Input>> inputs = new HashMap<>();
   private Set<Input> notReadyList = new HashSet<>();
 
   private boolean isDrain = false;
 
-  private String checkPointExtension;
-  private File checkPointFolderFile;
-
   private MetricData filesCountMetric = new MetricData("input.files.count", 
true);
 
   private Thread inputIsReadyMonitor;
@@ -72,6 +57,9 @@ public class InputManagerImpl extends InputManager {
   @Inject
   private LogFeederProps logFeederProps;
 
+  @Inject
+  private CheckpointManager checkpointHandler;
+
   public List<Input> getInputList(String serviceName) {
     return inputs.get(serviceName);
   }
@@ -130,43 +118,11 @@ public class InputManagerImpl extends InputManager {
 
   @Override
   public void init() throws Exception {
-    initCheckPointSettings();
+    checkpointHandler.init(logFeederProps);
     startMonitorThread();
     startDockerMetadataThread();
   }
 
-  private void initCheckPointSettings() {
-    checkPointExtension = logFeederProps.getCheckPointExtension();
-    LOG.info("Determining valid checkpoint folder");
-    boolean isCheckPointFolderValid = false;
-    // We need to keep track of the files we are reading.
-    String checkPointFolder = logFeederProps.getCheckpointFolder();
-    if (!StringUtils.isEmpty(checkPointFolder)) {
-      checkPointFolderFile = new File(checkPointFolder);
-      isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
-    }
-
-    if (!isCheckPointFolderValid) {
-      // Let's use tmp folder
-      checkPointFolderFile = new File(logFeederProps.getTmpDir(), 
CHECKPOINT_SUBFOLDER_NAME);
-      LOG.info("Checking if tmp folder can be used for checkpoints. Folder=" + 
checkPointFolderFile);
-      isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
-      if (isCheckPointFolderValid) {
-        LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check 
points. This is not recommended." +
-          "Please set logfeeder.checkpoint.folder property");
-      }
-    }
-
-    if (isCheckPointFolderValid) {
-      LOG.info("Using folder " + checkPointFolderFile + " for storing 
checkpoints");
-      // check checkpoint cleanup every 2000 min
-      Thread checkpointCleanupThread = new Thread(new 
CheckpointCleanupMonitor(this, 2000),"checkpoint_cleanup");
-      checkpointCleanupThread.setDaemon(true);
-      checkpointCleanupThread.start();
-    } else {
-      throw new IllegalStateException("Could not determine the checkpoint 
folder.");
-    }
-  }
 
   private void startDockerMetadataThread() {
     if (logFeederProps.isDockerContainerRegistryEnabled()) {
@@ -230,34 +186,6 @@ public class InputManagerImpl extends InputManager {
     }
   }
 
-  private boolean verifyCheckPointFolder(File folderPathFile) {
-    if (!folderPathFile.exists()) {
-      try {
-        if (!folderPathFile.mkdir()) {
-          LOG.warn("Error creating folder for check point. folder=" + 
folderPathFile);
-        }
-      } catch (Throwable t) {
-        LOG.warn("Error creating folder for check point. folder=" + 
folderPathFile, t);
-      }
-    }
-
-    if (folderPathFile.exists() && folderPathFile.isDirectory()) {
-      // Let's check whether we can create a file
-      File testFile = new File(folderPathFile, UUID.randomUUID().toString());
-      try {
-        testFile.createNewFile();
-        return testFile.delete();
-      } catch (IOException e) {
-        LOG.warn("Couldn't create test file in " + 
folderPathFile.getAbsolutePath() + " for checkPoint", e);
-      }
-    }
-    return false;
-  }
-
-  public File getCheckPointFolderFile() {
-    return checkPointFolderFile;
-  }
-
   @Override
   public void addToNotReady(Input notReadyInput) {
     notReadyList.add(notReadyInput);
@@ -285,100 +213,6 @@ public class InputManagerImpl extends InputManager {
     // TODO: logStatForMetric(filesCountMetric, "Stat: Files Monitored Count", 
"");
   }
 
-
-  public void cleanCheckPointFiles() {
-    if (checkPointFolderFile == null) {
-      LOG.info("Will not clean checkPoint files. checkPointFolderFile=" + 
checkPointFolderFile);
-      return;
-    }
-    LOG.info("Cleaning checkPoint files. checkPointFolderFile=" + 
checkPointFolderFile.getAbsolutePath());
-    try {
-      // Loop over the check point files and if filePath is not present, then 
move to closed
-      String searchPath = "*" + checkPointExtension;
-      FileFilter fileFilter = new WildcardFileFilter(searchPath);
-      File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
-      int totalCheckFilesDeleted = 0;
-      for (File checkPointFile : checkPointFiles) {
-        if (checkCheckPointFile(checkPointFile)) {
-          totalCheckFilesDeleted++;
-        }
-      }
-      LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s). 
checkPointFolderFile=" +
-        checkPointFolderFile.getAbsolutePath());
-
-    } catch (Throwable t) {
-      LOG.error("Error while cleaning checkPointFiles", t);
-    }
-  }
-
-  private boolean checkCheckPointFile(File checkPointFile) {
-    boolean deleted = false;
-    try (RandomAccessFile checkPointReader = new 
RandomAccessFile(checkPointFile, "r")) {
-      int contentSize = checkPointReader.readInt();
-      byte b[] = new byte[contentSize];
-      int readSize = checkPointReader.read(b, 0, contentSize);
-      if (readSize != contentSize) {
-        LOG.error("Couldn't read expected number of bytes from checkpoint 
file. expected=" + contentSize + ", read="
-          + readSize + ", checkPointFile=" + checkPointFile);
-      } else {
-        String jsonCheckPointStr = new String(b, 0, readSize);
-        Map<String, Object> jsonCheckPoint = 
LogFeederUtil.toJSONObject(jsonCheckPointStr);
-
-        String logFilePath = (String) jsonCheckPoint.get("file_path");
-        String logFileKey = (String) jsonCheckPoint.get("file_key");
-        Integer maxAgeMin = null;
-        if (jsonCheckPoint.containsKey("max_age_min")) {
-          maxAgeMin = 
Integer.parseInt(jsonCheckPoint.get("max_age_min").toString());
-        }
-        if (logFilePath != null && logFileKey != null) {
-          boolean deleteCheckPointFile = false;
-          File logFile = new File(logFilePath);
-          if (logFile.exists()) {
-            Object fileKeyObj = FileUtil.getFileKey(logFile);
-            String fileBase64 = 
Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
-            if (!logFileKey.equals(fileBase64)) {
-              LOG.info("CheckPoint clean: File key has changed. old=" + 
logFileKey + ", new=" + fileBase64 + ", filePath=" +
-                logFilePath + ", checkPointFile=" + 
checkPointFile.getAbsolutePath());
-              deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), 
logFileKey);
-            } else if (maxAgeMin != null && maxAgeMin != 0 && 
FileUtil.isFileTooOld(logFile, maxAgeMin)) {
-              deleteCheckPointFile = true;
-              LOG.info("Checkpoint clean: File reached max age minutes (" + 
maxAgeMin + "):" + logFilePath);
-            }
-          } else {
-            LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + 
logFilePath + ", checkPointFile=" +
-              checkPointFile.getAbsolutePath());
-            deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), 
logFileKey);
-          }
-          if (deleteCheckPointFile) {
-            LOG.info("Deleting CheckPoint file=" + 
checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath);
-            checkPointFile.delete();
-            deleted = true;
-          }
-        }
-      }
-    } catch (EOFException eof) {
-      LOG.warn("Caught EOFException. Ignoring reading existing checkPoint 
file. " + checkPointFile);
-    } catch (Throwable t) {
-      LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
-    }
-
-    return deleted;
-  }
-
-  private boolean wasFileRenamed(File folder, String searchFileBase64) {
-    for (File file : folder.listFiles()) {
-      Object fileKeyObj = FileUtil.getFileKey(file);
-      String fileBase64 = 
Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
-      if (searchFileBase64.equals(fileBase64)) {
-        // even though the file name in the checkpoint file is different from 
the one it was renamed to, checkpoint files are
-        // identified by their name, which is generated from the file key, 
which would be the same for the renamed file
-        LOG.info("CheckPoint clean: File key matches file " + 
file.getAbsolutePath() + ", it must have been renamed");
-        return true;
-      }
-    }
-    return false;
-  }
-
   public void waitOnAllInputs() {
     //wait on inputs
     for (List<Input> inputList : inputs.values()) {
@@ -469,5 +303,11 @@ public class InputManagerImpl extends InputManager {
     return logFeederProps;
   }
 
+  public CheckpointManager getCheckpointHandler() {
+    return checkpointHandler;
+  }
 
+  public void setCheckpointHandler(CheckpointManager checkpointHandler) {
+    this.checkpointHandler = checkpointHandler;
+  }
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index 7c633f0..13b00e3 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -168,8 +168,14 @@ public class InputSimulate extends InputFile {
     return lineNumber;
   }
 
-  public String getBase64FileKey() throws Exception {
-    String fileKey = InetAddress.getLocalHost().getHostAddress() + "|" + 
getFilePath();
+  public String getBase64FileKey() {
+    String fileKey;
+    try {
+      fileKey = InetAddress.getLocalHost().getHostAddress() + "|" + 
getFilePath();
+    } catch (Exception e) {
+      // skip
+      fileKey = "localhost|" + getFilePath();
+    }
     return Base64.byteArrayToBase64(fileKey.getBytes());
   }
 
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/FileCheckpointManager.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/FileCheckpointManager.java
new file mode 100644
index 0000000..69c21fb
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/FileCheckpointManager.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file.checkpoint;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import 
org.apache.ambari.logfeeder.input.file.checkpoint.util.CheckpointFileReader;
+import 
org.apache.ambari.logfeeder.input.file.checkpoint.util.FileCheckInHelper;
+import 
org.apache.ambari.logfeeder.input.file.checkpoint.util.FileCheckpointCleanupHelper;
+import 
org.apache.ambari.logfeeder.input.file.checkpoint.util.ResumeLineNumberHelper;
+import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+public class FileCheckpointManager implements CheckpointManager<InputFile, 
InputFileMarker, LogFeederProps> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileCheckpointManager.class);
+
+  private static final String CHECKPOINT_SUBFOLDER_NAME = 
"logfeeder_checkpoints";
+
+  private String checkPointExtension;
+  private File checkPointFolderFile;
+
+  @Override
+  public void init(LogFeederProps logFeederProps) {
+    checkPointExtension = logFeederProps.getCheckPointExtension();
+    LOG.info("Determining valid checkpoint folder");
+    boolean isCheckPointFolderValid = false;
+    // We need to keep track of the files we are reading.
+    String checkPointFolder = logFeederProps.getCheckpointFolder();
+    if (!StringUtils.isEmpty(checkPointFolder)) {
+      checkPointFolderFile = new File(checkPointFolder);
+      isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+    }
+
+    if (!isCheckPointFolderValid) {
+      // Let's use tmp folder
+      checkPointFolderFile = new File(logFeederProps.getTmpDir(), 
CHECKPOINT_SUBFOLDER_NAME);
+      LOG.info("Checking if tmp folder can be used for checkpoints. Folder=" + 
checkPointFolderFile);
+      isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+      if (isCheckPointFolderValid) {
+        LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check 
points. This is not recommended." +
+          "Please set logfeeder.checkpoint.folder property");
+      }
+    }
+
+    if (isCheckPointFolderValid) {
+      LOG.info("Using folder " + checkPointFolderFile + " for storing 
checkpoints");
+      // check checkpoint cleanup every 2000 min
+      Thread checkpointCleanupThread = new Thread(new 
CheckpointCleanupMonitor(this, 2000),"checkpoint_cleanup");
+      checkpointCleanupThread.setDaemon(true);
+      checkpointCleanupThread.start();
+    } else {
+      throw new IllegalStateException("Could not determine the checkpoint 
folder.");
+    }
+  }
+
+  @Override
+  public void checkIn(InputFile inputFile, InputFileMarker inputMarker) {
+    FileCheckInHelper.checkIn(inputFile, inputMarker);
+  }
+
+  @Override
+  public int resumeLineNumber(InputFile inputFile) {
+    return ResumeLineNumberHelper.getResumeFromLineNumber(inputFile, 
checkPointFolderFile);
+  }
+
+  @Override
+  public void cleanupCheckpoints() {
+    FileCheckpointCleanupHelper.cleanCheckPointFiles(checkPointFolderFile, 
checkPointExtension);
+  }
+
+  @Override
+  public void printCheckpoints(String checkpointLocation, String 
logTypeFilter, String fileKeyFilter) throws IOException {
+    System.out.println(String.format("Searching checkpoint files in '%s' 
folder ... (list)", checkpointLocation));
+    File[] files = CheckpointFileReader.getFiles(new File(checkpointLocation), 
".cp");
+    if (files != null) {
+      for (File file : files) {
+        String fileNameTitle = String.format("file name: %s", file.getName());
+        StringBuilder strBuilder = new StringBuilder(fileNameTitle.length());
+        String[] splitted = file.getName().split("-");
+        String logtType = "";
+        String fileKey = "";
+        if (splitted.length > 1) {
+          logtType = splitted[0];
+          fileKey = splitted[1].replace(getFileExtension(), "");
+        } else {
+          fileKey = file.getName().replace(getFileExtension(), "");
+        }
+        if (checkFilter(logtType, logTypeFilter) || checkFilter(fileKey, 
fileKeyFilter)) {
+          continue;
+        }
+        Stream.generate(() -> 
'-').limit(fileNameTitle.length()).forEach(strBuilder::append);
+        String border = strBuilder.toString();
+        System.out.println(border);
+        System.out.println(String.format("file name: %s", file.getName()));
+        System.out.println(border);
+        if (org.apache.commons.lang.StringUtils.isNotBlank(logtType)) {
+          System.out.println(String.format("log_type: %s", logtType));
+        }
+        Map<String, String> checkpointJson = 
CheckpointFileReader.getCheckpointObject(file);
+        for (Map.Entry<String, String> entry : checkpointJson.entrySet()) {
+          System.out.println(String.format("%s: %s", entry.getKey(), 
entry.getValue()));
+        }
+        System.out.print("\n");
+      }
+    }
+  }
+
+  @Override
+  public void cleanCheckpoint(String checkpointLocation, String logTypeFilter, 
String fileKeyFilter, boolean all) {
+    System.out.println(String.format("Searching checkpoint files in '%s' 
folder ... (clean)", checkpointLocation));
+    File[] files = CheckpointFileReader.getFiles(new File(checkpointLocation), 
".cp");
+    if (files != null) {
+      for (File file : files) {
+        String logtType = getLogTypeFromFileName(file.getName());
+        String fileKey = getFileKeyFromFileName(file.getName());
+        if (checkFilter(logtType, logTypeFilter) || checkFilter(fileKey, 
fileKeyFilter)) {
+          continue;
+        }
+        if (!all && logTypeFilter == null && fileKeyFilter == null) {
+          throw new IllegalArgumentException("It is required to use a filter 
for clean: --all, --log-type <log_type> or --file-key <file_key>");
+        }
+
+        if (all || logTypeFilter != null && logTypeFilter.equals(logtType) ||
+          fileKeyFilter != null && fileKeyFilter.equals(fileKey)) {
+          System.out.println(String.format("Deleting checkpoint file - 
filename: %s, key: %s, log_type: %s", file.getAbsolutePath(), fileKey, 
logtType));
+          file.delete();
+        }
+      }
+    }
+  }
+
+  private boolean verifyCheckPointFolder(File folderPathFile) {
+    if (!folderPathFile.exists()) {
+      try {
+        if (!folderPathFile.mkdir()) {
+          LOG.warn("Error creating folder for check point. folder=" + 
folderPathFile);
+        }
+      } catch (Throwable t) {
+        LOG.warn("Error creating folder for check point. folder=" + 
folderPathFile, t);
+      }
+    }
+
+    if (folderPathFile.exists() && folderPathFile.isDirectory()) {
+      // Let's check whether we can create a file
+      File testFile = new File(folderPathFile, UUID.randomUUID().toString());
+      try {
+        testFile.createNewFile();
+        return testFile.delete();
+      } catch (IOException e) {
+        LOG.warn("Couldn't create test file in " + 
folderPathFile.getAbsolutePath() + " for checkPoint", e);
+      }
+    }
+    return false;
+  }
+
+  private boolean checkFilter(String actualValue, String filterValue) {
+    return (org.apache.commons.lang.StringUtils.isNotBlank(actualValue) && 
org.apache.commons.lang.StringUtils.isNotBlank(filterValue) &&
+      !actualValue.equals(filterValue));
+  }
+
+  private String getFileExtension() {
+    return this.checkPointExtension == null ? ".cp" : this.checkPointExtension;
+  }
+
+  private String getFileKeyFromFileName(String fileName) {
+    String[] splitted = fileName.split("-");
+    String fileKeyResult = splitted.length > 1 ? splitted[1] : splitted[0];
+    return fileKeyResult.replace(getFileExtension(), "");
+  }
+
+  private String getLogTypeFromFileName(String fileName) {
+    String[] splitted = fileName.split("-");
+    String logTypeResult = "";
+    if (splitted.length > 1) {
+      logTypeResult = splitted[0];
+    }
+    return logTypeResult;
+  }
+
+}
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/CheckpointFileReader.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/CheckpointFileReader.java
new file mode 100644
index 0000000..dd35d07
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/CheckpointFileReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file.checkpoint.util;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Type;
+import java.util.Map;
+
+public class CheckpointFileReader {
+
+  private CheckpointFileReader() {
+  }
+
+  public static File[] getFiles(File checkPointFolderFile, String 
checkPointExtension) {
+    String searchPath = "*" + checkPointExtension;
+    FileFilter fileFilter = new WildcardFileFilter(searchPath);
+    return checkPointFolderFile.listFiles(fileFilter);
+  }
+
+  public static Map<String, String> getCheckpointObject(File checkPointFile) 
throws IOException {
+    final Map<String, String> jsonCheckPoint;
+    try (RandomAccessFile checkPointReader = new 
RandomAccessFile(checkPointFile, "r")) {
+      int contentSize = checkPointReader.readInt();
+      byte b[] = new byte[contentSize];
+      int readSize = checkPointReader.read(b, 0, contentSize);
+      if (readSize != contentSize) {
+        throw new IllegalArgumentException("Couldn't read expected number of 
bytes from checkpoint file. expected=" + contentSize + ", read="
+          + readSize + ", checkPointFile=" + checkPointFile);
+      } else {
+        String jsonCheckPointStr = new String(b, 0, readSize);
+        Gson gson = new GsonBuilder().create();
+        Type type = new TypeToken<Map<String, String>>() {}.getType();
+        jsonCheckPoint = gson.fromJson(jsonCheckPointStr, type);
+      }
+    }
+    return jsonCheckPoint;
+  }
+
+
+}
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckInHelper.java
similarity index 98%
rename from 
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
rename to 
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckInHelper.java
index 7b8f0cd..b217e34 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckInHelper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.input.file;
+package org.apache.ambari.logfeeder.input.file.checkpoint.util;
 
 import org.apache.ambari.logfeeder.input.InputFile;
 import org.apache.ambari.logfeeder.input.InputFileMarker;
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckpointCleanupHelper.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckpointCleanupHelper.java
new file mode 100644
index 0000000..91b5383
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckpointCleanupHelper.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file.checkpoint.util;
+
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.solr.common.util.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Map;
+
+public class FileCheckpointCleanupHelper {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileCheckpointCleanupHelper.class);
+
+  private FileCheckpointCleanupHelper() {
+  }
+
+  public static void cleanCheckPointFiles(File checkPointFolderFile, String 
checkPointExtension) {
+    if (checkPointFolderFile == null) {
+      LOG.info("Will not clean checkPoint files. checkPointFolderFile=null");
+      return;
+    }
+    LOG.info("Cleaning checkPoint files. checkPointFolderFile=" + 
checkPointFolderFile.getAbsolutePath());
+    try {
+      // Loop over the check point files and if filePath is not present, then 
move to closed
+      File[] checkPointFiles = 
CheckpointFileReader.getFiles(checkPointFolderFile, checkPointExtension);
+      int totalCheckFilesDeleted = 0;
+      if (checkPointFiles != null) {
+        for (File checkPointFile : checkPointFiles) {
+          if (checkCheckPointFile(checkPointFile)) {
+            totalCheckFilesDeleted++;
+          }
+        }
+        LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s). 
checkPointFolderFile=" +
+          checkPointFolderFile.getAbsolutePath());
+      }
+    } catch (Throwable t) {
+      LOG.error("Error while cleaning checkPointFiles", t);
+    }
+  }
+
+  private static boolean checkCheckPointFile(File checkPointFile) {
+    boolean deleted = false;
+    try (RandomAccessFile checkPointReader = new 
RandomAccessFile(checkPointFile, "r")) {
+      int contentSize = checkPointReader.readInt();
+      byte b[] = new byte[contentSize];
+      int readSize = checkPointReader.read(b, 0, contentSize);
+      if (readSize != contentSize) {
+        LOG.error("Couldn't read expected number of bytes from checkpoint 
file. expected=" + contentSize + ", read="
+          + readSize + ", checkPointFile=" + checkPointFile);
+      } else {
+        String jsonCheckPointStr = new String(b, 0, readSize);
+        Map<String, Object> jsonCheckPoint = 
LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+        String logFilePath = (String) jsonCheckPoint.get("file_path");
+        String logFileKey = (String) jsonCheckPoint.get("file_key");
+        Integer maxAgeMin = null;
+        if (jsonCheckPoint.containsKey("max_age_min")) {
+          maxAgeMin = 
Integer.parseInt(jsonCheckPoint.get("max_age_min").toString());
+        }
+        if (logFilePath != null && logFileKey != null) {
+          boolean deleteCheckPointFile = false;
+          File logFile = new File(logFilePath);
+          if (logFile.exists()) {
+            Object fileKeyObj = FileUtil.getFileKey(logFile);
+            String fileBase64 = 
Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
+            if (!logFileKey.equals(fileBase64)) {
+              LOG.info("CheckPoint clean: File key has changed. old=" + 
logFileKey + ", new=" + fileBase64 + ", filePath=" +
+                logFilePath + ", checkPointFile=" + 
checkPointFile.getAbsolutePath());
+              deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), 
logFileKey);
+            } else if (maxAgeMin != null && maxAgeMin != 0 && 
FileUtil.isFileTooOld(logFile, maxAgeMin)) {
+              deleteCheckPointFile = true;
+              LOG.info("Checkpoint clean: File reached max age minutes (" + 
maxAgeMin + "):" + logFilePath);
+            }
+          } else {
+            LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + 
logFilePath + ", checkPointFile=" +
+              checkPointFile.getAbsolutePath());
+            deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), 
logFileKey);
+          }
+          if (deleteCheckPointFile) {
+            LOG.info("Deleting CheckPoint file=" + 
checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath);
+            checkPointFile.delete();
+            deleted = true;
+          }
+        }
+      }
+    } catch (EOFException eof) {
+      LOG.warn("Caught EOFException. Ignoring reading existing checkPoint 
file. " + checkPointFile);
+    } catch (Throwable t) {
+      LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
+    }
+
+    return deleted;
+  }
+
+  private static boolean wasFileRenamed(File folder, String searchFileBase64) {
+    for (File file : folder.listFiles()) {
+      Object fileKeyObj = FileUtil.getFileKey(file);
+      String fileBase64 = 
Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
+      if (searchFileBase64.equals(fileBase64)) {
+        // even though the file name in the checkpoint file is different from 
the one it was renamed to, checkpoint files are
+        // identified by their name, which is generated from the file key, 
which would be the same for the renamed file
+        LOG.info("CheckPoint clean: File key matches file " + 
file.getAbsolutePath() + ", it must have been renamed");
+        return true;
+      }
+    }
+    return false;
+  }
+
+
+}
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/ResumeLineNumberHelper.java
similarity index 89%
rename from 
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
rename to 
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/ResumeLineNumberHelper.java
index 9350200..664fa4f 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/ResumeLineNumberHelper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.input.file;
+package org.apache.ambari.logfeeder.input.file.checkpoint.util;
 
 import org.apache.ambari.logfeeder.input.InputFile;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -36,15 +36,14 @@ public class ResumeLineNumberHelper {
   private ResumeLineNumberHelper() {
   }
 
-  public static int getResumeFromLineNumber(InputFile inputFile) {
+  public static int getResumeFromLineNumber(InputFile inputFile, File 
checkPointFolder) {
     int resumeFromLineNumber = 0;
 
     File checkPointFile = null;
     try {
       LOG.info("Checking existing checkpoint file. " + 
inputFile.getShortDescription());
 
-      String checkPointFileName = inputFile.getBase64FileKey() + 
inputFile.getCheckPointExtension();
-      File checkPointFolder = 
inputFile.getInputManager().getCheckPointFolderFile();
+      String checkPointFileName = getCheckpointFileName(inputFile);
       checkPointFile = new File(checkPointFolder, checkPointFileName);
       inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(), 
checkPointFile);
       Map<String, Object> jsonCheckPoint = null;
@@ -88,4 +87,9 @@ public class ResumeLineNumberHelper {
     return resumeFromLineNumber;
   }
 
+  private static String getCheckpointFileName(InputFile inputFile) {
+    return String.format("%s-%s%s", inputFile.getLogType(),
+      inputFile.getBase64FileKey(), inputFile.getCheckPointExtension());
+  }
+
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
index 65a314e..45404c4 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
@@ -18,7 +18,7 @@
  */
 package org.apache.ambari.logfeeder.input.monitor;
 
-import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,11 +27,11 @@ public class CheckpointCleanupMonitor implements Runnable {
   private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointCleanupMonitor.class);
 
   private long waitIntervalMin;
-  private InputManager inputManager;
+  private CheckpointManager checkpointHandler;
 
-  public CheckpointCleanupMonitor(InputManager inputManager, long 
waitIntervalMin) {
+  public CheckpointCleanupMonitor(CheckpointManager checkpointHandler, long 
waitIntervalMin) {
     this.waitIntervalMin = waitIntervalMin;
-    this.inputManager = inputManager;
+    this.checkpointHandler = checkpointHandler;
   }
 
   @Override
@@ -39,7 +39,7 @@ public class CheckpointCleanupMonitor implements Runnable {
     while (!Thread.currentThread().isInterrupted()) {
       try {
         Thread.sleep(1000 * 60 * waitIntervalMin);
-        inputManager.cleanCheckPointFiles();
+        checkpointHandler.cleanupCheckpoints();
       } catch (Exception e) {
         LOG.error("Cleanup checkpoint files thread interrupted.", e);
       }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh
index abe062f..7f1d8ec 100755
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh
@@ -91,6 +91,7 @@ function print_usage() {
      start                         Start Log Feeder
      stop                          Stop Log Feeder
      status                        Check Log Feeder status (pid file)
+     checkpoints                   Checkpoint operations
      test                          Test Log Feeder shipper configs
      help                          Print usage
 
@@ -106,6 +107,14 @@ function print_usage() {
      -tgc, --test-global-config    Global configuration files (comma separated 
list) for testing if log entry is parseable
      -tli, --test-log-id           The id of the log to test
 
+   checkpoints command arguments:
+     -l, --list                    Print checkpoints
+     -cf, --checkpoints-folder     Checkpoints folder location
+     -c, --clean                   Remove a checkpoint file (by key/log type 
or use on all)
+     -k, --file-key                Filter on file key (for list and clean)
+     -lt, --log-type               Filter on log type (for list and clean)
+     -a, --all                     Flag all checkpoints to be deleted by clean 
command
+
 EOF
 }
 
@@ -263,6 +272,11 @@ function test() {
   $JVM -cp "$LOGFEEDER_CONF_DIR:$LOGFEEDER_LIBS_DIR/*" $LOGFEEDER_JAVA_OPTS 
org.apache.ambari.logfeeder.LogFeederCommandLine --test ${@}
 }
 
+function checkpoints() {
+  echo "Running command: $JVM -cp "$LOGFEEDER_CONF_DIR:$LOGFEEDER_LIBS_DIR/*" 
org.apache.ambari.logfeeder.LogFeederCommandLine --checkpoints ${@}"
+  $JVM -cp "$LOGFEEDER_CONF_DIR:$LOGFEEDER_LIBS_DIR/*" $LOGFEEDER_JAVA_OPTS 
org.apache.ambari.logfeeder.LogFeederCommandLine --checkpoints ${@}
+}
+
 if [ $# -gt 0 ]; then
     SCRIPT_CMD="$1"
     shift
@@ -284,6 +298,9 @@ case $SCRIPT_CMD in
   test)
     test ${1+"$@"}
   ;;
+  checkpoints)
+    checkpoints ${1+"$@"}
+  ;;
   help)
     print_usage
     exit 0
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
index 2219be1..0a95342 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -26,8 +26,10 @@ import java.util.List;
 
 import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.file.checkpoint.FileCheckpointManager;
 import org.apache.ambari.logfeeder.plugin.filter.Filter;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
 import org.apache.ambari.logfeeder.plugin.manager.InputManager;
 import 
org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputFileDescriptorImpl;
 import org.apache.commons.io.FileUtils;
@@ -77,6 +79,13 @@ public class InputFileTest {
     FileUtils.cleanDirectory(TEST_DIR);
   }
 
+  @AfterClass
+  public static void deleteDir() throws IOException {
+    if (TEST_DIR.exists()) {
+      FileUtils.deleteDirectory(TEST_DIR);
+    }
+  }
+
   @Before
   public void setUp() throws Exception {
     logFeederProps = new LogFeederProps();
@@ -85,6 +94,7 @@ public class InputFileTest {
     logEntryCacheConfig.setCacheLastDedupEnabled(false);
     logEntryCacheConfig.setCacheSize(10);
     logFeederProps.setLogEntryCacheConfig(logEntryCacheConfig);
+    logFeederProps.setCheckpointFolder("process3_checkpoint");
     testInputMarker = new InputFileMarker(inputFile, "", 0);
   }
 
@@ -125,13 +135,13 @@ public class InputFileTest {
   public void testInputFile_process3Rows() throws Exception {
     LOG.info("testInputFile_process3Rows()");
 
-    File checkPointDir = createCheckpointDir("process3_checkpoint");
     File testFile = createFile("process3.log");
 
     init(testFile.getAbsolutePath());
 
     InputManager inputManager = EasyMock.createStrictMock(InputManager.class);
-    
EasyMock.expect(inputManager.getCheckPointFolderFile()).andReturn(checkPointDir);
+    CheckpointManager checkpointManager = new FileCheckpointManager();
+    
EasyMock.expect(inputManager.getCheckpointHandler()).andReturn(checkpointManager);
     EasyMock.replay(inputManager);
     inputFile.setInputManager(inputManager);
 

Reply via email to