This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 85a0e32 AMBARI-24355. Logfeeder: create CLI for checkpoints and
include log type in checkpoint file names. (#2245)
85a0e32 is described below
commit 85a0e3203034aeba9ad5923d068382e160701355
Author: Olivér Szabó <[email protected]>
AuthorDate: Wed Sep 5 13:03:06 2018 +0200
AMBARI-24355. Logfeeder: create CLI for checkpoints and include log type in
checkpoint file names. (#2245)
* AMBARI-24355. Logfeeder: include log type in checkpoint file names.
* AMBARI-24355. Add CLI for handling checkpoints.
---
.../{InputManager.java => CheckpointManager.java} | 31 ++-
.../logfeeder/plugin/manager/InputManager.java | 6 +-
.../ambari/logfeeder/LogFeederCommandLine.java | 82 +++++++-
.../ambari/logfeeder/common/ConfigHandler.java | 2 +-
.../ambari/logfeeder/conf/ApplicationConfig.java | 9 +-
.../apache/ambari/logfeeder/input/InputFile.java | 8 +-
.../ambari/logfeeder/input/InputManagerImpl.java | 182 ++----------------
.../ambari/logfeeder/input/InputSimulate.java | 10 +-
.../file/checkpoint/FileCheckpointManager.java | 208 +++++++++++++++++++++
.../file/checkpoint/util/CheckpointFileReader.java | 64 +++++++
.../{ => checkpoint/util}/FileCheckInHelper.java | 2 +-
.../util/FileCheckpointCleanupHelper.java | 132 +++++++++++++
.../util}/ResumeLineNumberHelper.java | 12 +-
.../input/monitor/CheckpointCleanupMonitor.java | 10 +-
.../src/main/scripts/logfeeder.sh | 17 ++
.../ambari/logfeeder/input/InputFileTest.java | 14 +-
16 files changed, 575 insertions(+), 214 deletions(-)
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/CheckpointManager.java
similarity index 56%
copy from
ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
copy to
ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/CheckpointManager.java
index 0734158..abf1465 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/CheckpointManager.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,29 +18,26 @@
*/
package org.apache.ambari.logfeeder.plugin.manager;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
-import java.io.File;
-import java.util.List;
+import java.io.IOException;
+public interface CheckpointManager<I extends Input, IFM extends InputMarker, P
extends LogFeederProperties> {
-public abstract class InputManager implements BlockManager {
+ void init(P properties);
- public abstract void addToNotReady(Input input);
+ void checkIn(I inputFile, IFM inputMarker);
- public abstract void checkInAll();
+ int resumeLineNumber(I input);
- public abstract List<Input> getInputList(String serviceName);
+ void cleanupCheckpoints();
- public abstract void add(String serviceName, Input input);
+ void printCheckpoints(String checkpointLocation, String logTypeFilter,
+ String fileKeyFilter) throws IOException;
- public abstract void removeInput(Input input);
+ void cleanCheckpoint(String checkpointLocation, String logTypeFilter,
+ String fileKeyFilter, boolean all) throws IOException;
- public abstract File getCheckPointFolderFile();
-
- public abstract void cleanCheckPointFiles();
-
- public abstract void removeInputsForService(String serviceName);
-
- public abstract void startInputs(String serviceName);
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
index 0734158..6dc1423 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
@@ -36,11 +36,9 @@ public abstract class InputManager implements BlockManager {
public abstract void removeInput(Input input);
- public abstract File getCheckPointFolderFile();
-
- public abstract void cleanCheckPointFiles();
-
public abstract void removeInputsForService(String serviceName);
public abstract void startInputs(String serviceName);
+
+ public abstract CheckpointManager getCheckpointHandler();
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
index a6fcb2a..3812ed1 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
@@ -21,6 +21,8 @@ package org.apache.ambari.logfeeder;
import com.google.gson.GsonBuilder;
import org.apache.ambari.logfeeder.common.LogEntryParseTester;
+import org.apache.ambari.logfeeder.input.file.checkpoint.FileCheckpointManager;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
@@ -36,16 +38,24 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
public class LogFeederCommandLine {
private static final String TEST_COMMAND = "test";
+ private static final String CHECKPOINTS_COMMAND = "checkpoints";
+ private static final String CHECKPOINTS_FOLDER_OPTION = "checkpoints-folder";
+ private static final String CHECKPOINTS_LIST_OPTION = "list";
+ private static final String CHECKPOINTS_CLEAN_OPTION = "clean";
+ private static final String CHECKPOINTS_ALL_OPTION = "all";
+ private static final String CHECKPOINTS_FILE_KEY = "file-key";
+ private static final String CHECKPOINTS_LOG_TYPE = "log-type";
private static final String TEST_LOG_ENTRY_OPTION = "test-log-entry";
private static final String TEST_SHIPPER_CONFIG_OPTION =
"test-shipper-config";
private static final String TEST_GLOBAL_CONFIG_OPTION = "test-global-config";
private static final String TEST_LOG_ID_OPTION = "test-log-id";
- private static final String COMMAND_LINE_SYNTAX = "java
org.apache.ambari.logfeeder.LogFeederCommandLine --test [args]";
+ private static final String COMMAND_LINE_SYNTAX = "java
org.apache.ambari.logfeeder.LogFeederCommandLine [args]";
public static void main(String[] args) {
Options options = new Options();
@@ -63,6 +73,44 @@ public class LogFeederCommandLine {
.desc("Test if log entry is parseable")
.build();
+ Option checkpointsOption = Option.builder("cp")
+ .longOpt(CHECKPOINTS_COMMAND)
+ .desc("Use checkpoint operations")
+ .build();
+
+ Option checkpointsListOption = Option.builder("l")
+ .longOpt(CHECKPOINTS_LIST_OPTION)
+ .desc("Print checkpoints")
+ .build();
+
+ Option checkpointsCleanOption = Option.builder("c")
+ .longOpt(CHECKPOINTS_CLEAN_OPTION)
+ .desc("Remove a checkpoint file (by key/log type or use on all)")
+ .build();
+
+ Option checkpointsFolderOption = Option.builder("cf")
+ .longOpt(CHECKPOINTS_FOLDER_OPTION)
+ .hasArg()
+ .desc("Checkpoints folder location")
+ .build();
+
+ Option checkpointsFileKeyOption = Option.builder("k")
+ .longOpt(CHECKPOINTS_FILE_KEY)
+ .hasArg()
+ .desc("Filter on file key (for list and clean)")
+ .build();
+
+ Option checkpointsLogTypeOption = Option.builder("lt")
+ .longOpt(CHECKPOINTS_LOG_TYPE)
+ .hasArg()
+ .desc("Filter on log type (for list and clean)")
+ .build();
+
+ Option checkpointAllOption = Option.builder("a")
+ .longOpt(CHECKPOINTS_ALL_OPTION)
+ .desc("")
+ .build();
+
Option testLogEntryOption = Option.builder("tle")
.longOpt(TEST_LOG_ENTRY_OPTION)
.hasArg()
@@ -93,6 +141,13 @@ public class LogFeederCommandLine {
options.addOption(testShipperConfOption);
options.addOption(testGlobalConfOption);
options.addOption(testLogIdOption);
+ options.addOption(checkpointsOption);
+ options.addOption(checkpointsListOption);
+ options.addOption(checkpointsCleanOption);
+ options.addOption(checkpointsFolderOption);
+ options.addOption(checkpointAllOption);
+ options.addOption(checkpointsFileKeyOption);
+ options.addOption(checkpointsLogTypeOption);
try {
CommandLineParser cmdLineParser = new DefaultParser();
@@ -103,6 +158,31 @@ public class LogFeederCommandLine {
System.exit(0);
}
String command = "";
+ if (cli.hasOption("cp")) {
+ String checkpointLocation = "";
+ if (cli.hasOption("cf")) {
+ checkpointLocation = cli.getOptionValue("cf");
+ } else {
+ Properties prop = new Properties();
+
prop.load(LogFeederCommandLine.class.getClassLoader().getResourceAsStream("logfeeder.properties"));
+ checkpointLocation = prop.getProperty("logfeeder.checkpoint.folder");
+ }
+ boolean cleanCommand = cli.hasOption("c");
+ boolean listCommand = cli.hasOption("l") || !cleanCommand; // Use list
if clean is not used
+ boolean allOption = cli.hasOption("a");
+ String logTypeFilter = cli.hasOption("lt") ? cli.getOptionValue("lt")
: null;
+ String fileKeyFilter = cli.hasOption("k") ? cli.getOptionValue("k") :
null;
+
+ final CheckpointManager checkpointManager = new
FileCheckpointManager();
+ if (listCommand) {
+ checkpointManager.printCheckpoints(checkpointLocation,
logTypeFilter, fileKeyFilter);
+ } else {
+ checkpointManager.cleanCheckpoint(checkpointLocation, logTypeFilter,
fileKeyFilter, allOption);
+ }
+
+ System.out.println("Checkpoint operation has finished successfully.");
+ return;
+ }
if (cli.hasOption("t")) {
command = TEST_COMMAND;
validateRequiredOptions(cli, command, testLogEntryOption,
testShipperConfOption);
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 8dbb39a..1ceef3b 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -421,7 +421,7 @@ public class ConfigHandler implements InputConfigMonitor {
}
public void cleanCheckPointFiles() {
- inputManager.cleanCheckPointFiles();
+ inputManager.getCheckpointHandler().cleanupCheckpoints();
}
public void logStats() {
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index f54fc0c..8c7e7d9 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -24,6 +24,8 @@ import
org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.input.InputConfigUploader;
import org.apache.ambari.logfeeder.input.InputManagerImpl;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
+import org.apache.ambari.logfeeder.input.file.checkpoint.FileCheckpointManager;
import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
import org.apache.ambari.logfeeder.common.ConfigHandler;
import org.apache.ambari.logfeeder.metrics.MetricsManager;
@@ -148,7 +150,7 @@ public class ApplicationConfig {
@Bean
- @DependsOn("containerRegistry")
+ @DependsOn({"containerRegistry", "checkpointHandler"})
public InputManager inputManager() {
return new InputManagerImpl();
}
@@ -159,6 +161,11 @@ public class ApplicationConfig {
}
@Bean
+ public CheckpointManager checkpointHandler() {
+ return new FileCheckpointManager();
+ }
+
+ @Bean
public DockerContainerRegistry containerRegistry() {
if (logFeederProps.isDockerContainerRegistryEnabled()) {
return
DockerContainerRegistry.getInstance(logFeederProps.getProperties());
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 441ce3e..85df2ee 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -26,9 +26,7 @@ import
org.apache.ambari.logfeeder.input.monitor.DockerLogFileUpdateMonitor;
import org.apache.ambari.logfeeder.input.monitor.LogFileDetachMonitor;
import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor;
import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
-import org.apache.ambari.logfeeder.input.file.FileCheckInHelper;
import org.apache.ambari.logfeeder.input.file.ProcessFileHelper;
-import org.apache.ambari.logfeeder.input.file.ResumeLineNumberHelper;
import org.apache.ambari.logfeeder.plugin.filter.Filter;
import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.util.FileUtil;
@@ -144,7 +142,7 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
@Override
public synchronized void checkIn(InputFileMarker inputMarker) {
- FileCheckInHelper.checkIn(this, inputMarker);
+ getInputManager().getCheckpointHandler().checkIn(this, inputMarker);
}
@Override
@@ -304,7 +302,7 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
}
public int getResumeFromLineNumber() {
- return ResumeLineNumberHelper.getResumeFromLineNumber(this);
+ return
this.getInputManager().getCheckpointHandler().resumeLineNumber(this);
}
public void processFile(File logPathFile, boolean follow) throws Exception {
@@ -485,7 +483,7 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
return fileKey;
}
- public String getBase64FileKey() throws Exception {
+ public String getBase64FileKey() {
return base64FileKey;
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
index ea97968..a256fd7 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
@@ -22,23 +22,14 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
import org.apache.ambari.logfeeder.docker.DockerContainerRegistryMonitor;
-import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
import org.apache.ambari.logfeeder.plugin.common.MetricData;
import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.plugin.manager.InputManager;
-import org.apache.ambari.logfeeder.util.FileUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
-import org.apache.solr.common.util.Base64;
import javax.inject.Inject;
-import java.io.EOFException;
import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -46,22 +37,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
public class InputManagerImpl extends InputManager {
private static final Logger LOG = Logger.getLogger(InputManagerImpl.class);
- private static final String CHECKPOINT_SUBFOLDER_NAME =
"logfeeder_checkpoints";
-
private Map<String, List<Input>> inputs = new HashMap<>();
private Set<Input> notReadyList = new HashSet<>();
private boolean isDrain = false;
- private String checkPointExtension;
- private File checkPointFolderFile;
-
private MetricData filesCountMetric = new MetricData("input.files.count",
true);
private Thread inputIsReadyMonitor;
@@ -72,6 +57,9 @@ public class InputManagerImpl extends InputManager {
@Inject
private LogFeederProps logFeederProps;
+ @Inject
+ private CheckpointManager checkpointHandler;
+
public List<Input> getInputList(String serviceName) {
return inputs.get(serviceName);
}
@@ -130,43 +118,11 @@ public class InputManagerImpl extends InputManager {
@Override
public void init() throws Exception {
- initCheckPointSettings();
+ checkpointHandler.init(logFeederProps);
startMonitorThread();
startDockerMetadataThread();
}
- private void initCheckPointSettings() {
- checkPointExtension = logFeederProps.getCheckPointExtension();
- LOG.info("Determining valid checkpoint folder");
- boolean isCheckPointFolderValid = false;
- // We need to keep track of the files we are reading.
- String checkPointFolder = logFeederProps.getCheckpointFolder();
- if (!StringUtils.isEmpty(checkPointFolder)) {
- checkPointFolderFile = new File(checkPointFolder);
- isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
- }
-
- if (!isCheckPointFolderValid) {
- // Let's use tmp folder
- checkPointFolderFile = new File(logFeederProps.getTmpDir(),
CHECKPOINT_SUBFOLDER_NAME);
- LOG.info("Checking if tmp folder can be used for checkpoints. Folder=" +
checkPointFolderFile);
- isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
- if (isCheckPointFolderValid) {
- LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check
points. This is not recommended." +
- "Please set logfeeder.checkpoint.folder property");
- }
- }
-
- if (isCheckPointFolderValid) {
- LOG.info("Using folder " + checkPointFolderFile + " for storing
checkpoints");
- // check checkpoint cleanup every 2000 min
- Thread checkpointCleanupThread = new Thread(new
CheckpointCleanupMonitor(this, 2000),"checkpoint_cleanup");
- checkpointCleanupThread.setDaemon(true);
- checkpointCleanupThread.start();
- } else {
- throw new IllegalStateException("Could not determine the checkpoint
folder.");
- }
- }
private void startDockerMetadataThread() {
if (logFeederProps.isDockerContainerRegistryEnabled()) {
@@ -230,34 +186,6 @@ public class InputManagerImpl extends InputManager {
}
}
- private boolean verifyCheckPointFolder(File folderPathFile) {
- if (!folderPathFile.exists()) {
- try {
- if (!folderPathFile.mkdir()) {
- LOG.warn("Error creating folder for check point. folder=" +
folderPathFile);
- }
- } catch (Throwable t) {
- LOG.warn("Error creating folder for check point. folder=" +
folderPathFile, t);
- }
- }
-
- if (folderPathFile.exists() && folderPathFile.isDirectory()) {
- // Let's check whether we can create a file
- File testFile = new File(folderPathFile, UUID.randomUUID().toString());
- try {
- testFile.createNewFile();
- return testFile.delete();
- } catch (IOException e) {
- LOG.warn("Couldn't create test file in " +
folderPathFile.getAbsolutePath() + " for checkPoint", e);
- }
- }
- return false;
- }
-
- public File getCheckPointFolderFile() {
- return checkPointFolderFile;
- }
-
@Override
public void addToNotReady(Input notReadyInput) {
notReadyList.add(notReadyInput);
@@ -285,100 +213,6 @@ public class InputManagerImpl extends InputManager {
// TODO: logStatForMetric(filesCountMetric, "Stat: Files Monitored Count",
"");
}
-
- public void cleanCheckPointFiles() {
- if (checkPointFolderFile == null) {
- LOG.info("Will not clean checkPoint files. checkPointFolderFile=" +
checkPointFolderFile);
- return;
- }
- LOG.info("Cleaning checkPoint files. checkPointFolderFile=" +
checkPointFolderFile.getAbsolutePath());
- try {
- // Loop over the check point files and if filePath is not present, then
move to closed
- String searchPath = "*" + checkPointExtension;
- FileFilter fileFilter = new WildcardFileFilter(searchPath);
- File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
- int totalCheckFilesDeleted = 0;
- for (File checkPointFile : checkPointFiles) {
- if (checkCheckPointFile(checkPointFile)) {
- totalCheckFilesDeleted++;
- }
- }
- LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s).
checkPointFolderFile=" +
- checkPointFolderFile.getAbsolutePath());
-
- } catch (Throwable t) {
- LOG.error("Error while cleaning checkPointFiles", t);
- }
- }
-
- private boolean checkCheckPointFile(File checkPointFile) {
- boolean deleted = false;
- try (RandomAccessFile checkPointReader = new
RandomAccessFile(checkPointFile, "r")) {
- int contentSize = checkPointReader.readInt();
- byte b[] = new byte[contentSize];
- int readSize = checkPointReader.read(b, 0, contentSize);
- if (readSize != contentSize) {
- LOG.error("Couldn't read expected number of bytes from checkpoint
file. expected=" + contentSize + ", read="
- + readSize + ", checkPointFile=" + checkPointFile);
- } else {
- String jsonCheckPointStr = new String(b, 0, readSize);
- Map<String, Object> jsonCheckPoint =
LogFeederUtil.toJSONObject(jsonCheckPointStr);
-
- String logFilePath = (String) jsonCheckPoint.get("file_path");
- String logFileKey = (String) jsonCheckPoint.get("file_key");
- Integer maxAgeMin = null;
- if (jsonCheckPoint.containsKey("max_age_min")) {
- maxAgeMin =
Integer.parseInt(jsonCheckPoint.get("max_age_min").toString());
- }
- if (logFilePath != null && logFileKey != null) {
- boolean deleteCheckPointFile = false;
- File logFile = new File(logFilePath);
- if (logFile.exists()) {
- Object fileKeyObj = FileUtil.getFileKey(logFile);
- String fileBase64 =
Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
- if (!logFileKey.equals(fileBase64)) {
- LOG.info("CheckPoint clean: File key has changed. old=" +
logFileKey + ", new=" + fileBase64 + ", filePath=" +
- logFilePath + ", checkPointFile=" +
checkPointFile.getAbsolutePath());
- deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(),
logFileKey);
- } else if (maxAgeMin != null && maxAgeMin != 0 &&
FileUtil.isFileTooOld(logFile, maxAgeMin)) {
- deleteCheckPointFile = true;
- LOG.info("Checkpoint clean: File reached max age minutes (" +
maxAgeMin + "):" + logFilePath);
- }
- } else {
- LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" +
logFilePath + ", checkPointFile=" +
- checkPointFile.getAbsolutePath());
- deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(),
logFileKey);
- }
- if (deleteCheckPointFile) {
- LOG.info("Deleting CheckPoint file=" +
checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath);
- checkPointFile.delete();
- deleted = true;
- }
- }
- }
- } catch (EOFException eof) {
- LOG.warn("Caught EOFException. Ignoring reading existing checkPoint
file. " + checkPointFile);
- } catch (Throwable t) {
- LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
- }
-
- return deleted;
- }
-
- private boolean wasFileRenamed(File folder, String searchFileBase64) {
- for (File file : folder.listFiles()) {
- Object fileKeyObj = FileUtil.getFileKey(file);
- String fileBase64 =
Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
- if (searchFileBase64.equals(fileBase64)) {
- // even though the file name in the checkpoint file is different from
the one it was renamed to, checkpoint files are
- // identified by their name, which is generated from the file key,
which would be the same for the renamed file
- LOG.info("CheckPoint clean: File key matches file " +
file.getAbsolutePath() + ", it must have been renamed");
- return true;
- }
- }
- return false;
- }
-
public void waitOnAllInputs() {
//wait on inputs
for (List<Input> inputList : inputs.values()) {
@@ -469,5 +303,11 @@ public class InputManagerImpl extends InputManager {
return logFeederProps;
}
+ public CheckpointManager getCheckpointHandler() {
+ return checkpointHandler;
+ }
+ public void setCheckpointHandler(CheckpointManager checkpointHandler) {
+ this.checkpointHandler = checkpointHandler;
+ }
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index 7c633f0..13b00e3 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -168,8 +168,14 @@ public class InputSimulate extends InputFile {
return lineNumber;
}
- public String getBase64FileKey() throws Exception {
- String fileKey = InetAddress.getLocalHost().getHostAddress() + "|" +
getFilePath();
+ public String getBase64FileKey() {
+ String fileKey;
+ try {
+ fileKey = InetAddress.getLocalHost().getHostAddress() + "|" +
getFilePath();
+ } catch (Exception e) {
+ // skip
+ fileKey = "localhost|" + getFilePath();
+ }
return Base64.byteArrayToBase64(fileKey.getBytes());
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/FileCheckpointManager.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/FileCheckpointManager.java
new file mode 100644
index 0000000..69c21fb
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/FileCheckpointManager.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file.checkpoint;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import
org.apache.ambari.logfeeder.input.file.checkpoint.util.CheckpointFileReader;
+import
org.apache.ambari.logfeeder.input.file.checkpoint.util.FileCheckInHelper;
+import
org.apache.ambari.logfeeder.input.file.checkpoint.util.FileCheckpointCleanupHelper;
+import
org.apache.ambari.logfeeder.input.file.checkpoint.util.ResumeLineNumberHelper;
+import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+public class FileCheckpointManager implements CheckpointManager<InputFile,
InputFileMarker, LogFeederProps> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FileCheckpointManager.class);
+
+ private static final String CHECKPOINT_SUBFOLDER_NAME =
"logfeeder_checkpoints";
+
+ private String checkPointExtension;
+ private File checkPointFolderFile;
+
+ @Override
+ public void init(LogFeederProps logFeederProps) {
+ checkPointExtension = logFeederProps.getCheckPointExtension();
+ LOG.info("Determining valid checkpoint folder");
+ boolean isCheckPointFolderValid = false;
+ // We need to keep track of the files we are reading.
+ String checkPointFolder = logFeederProps.getCheckpointFolder();
+ if (!StringUtils.isEmpty(checkPointFolder)) {
+ checkPointFolderFile = new File(checkPointFolder);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ }
+
+ if (!isCheckPointFolderValid) {
+ // Let's use tmp folder
+ checkPointFolderFile = new File(logFeederProps.getTmpDir(),
CHECKPOINT_SUBFOLDER_NAME);
+ LOG.info("Checking if tmp folder can be used for checkpoints. Folder=" +
checkPointFolderFile);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ if (isCheckPointFolderValid) {
+ LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check
points. This is not recommended." +
+ "Please set logfeeder.checkpoint.folder property");
+ }
+ }
+
+ if (isCheckPointFolderValid) {
+ LOG.info("Using folder " + checkPointFolderFile + " for storing
checkpoints");
+ // check checkpoint cleanup every 2000 min
+ Thread checkpointCleanupThread = new Thread(new
CheckpointCleanupMonitor(this, 2000),"checkpoint_cleanup");
+ checkpointCleanupThread.setDaemon(true);
+ checkpointCleanupThread.start();
+ } else {
+ throw new IllegalStateException("Could not determine the checkpoint
folder.");
+ }
+ }
+
+ @Override
+ public void checkIn(InputFile inputFile, InputFileMarker inputMarker) {
+ FileCheckInHelper.checkIn(inputFile, inputMarker);
+ }
+
+ @Override
+ public int resumeLineNumber(InputFile inputFile) {
+ return ResumeLineNumberHelper.getResumeFromLineNumber(inputFile,
checkPointFolderFile);
+ }
+
+ @Override
+ public void cleanupCheckpoints() {
+ FileCheckpointCleanupHelper.cleanCheckPointFiles(checkPointFolderFile,
checkPointExtension);
+ }
+
+ @Override
+ public void printCheckpoints(String checkpointLocation, String
logTypeFilter, String fileKeyFilter) throws IOException {
+ System.out.println(String.format("Searching checkpoint files in '%s'
folder ... (list)", checkpointLocation));
+ File[] files = CheckpointFileReader.getFiles(new File(checkpointLocation),
".cp");
+ if (files != null) {
+ for (File file : files) {
+ String fileNameTitle = String.format("file name: %s", file.getName());
+ StringBuilder strBuilder = new StringBuilder(fileNameTitle.length());
+ String[] splitted = file.getName().split("-");
+ String logtType = "";
+ String fileKey = "";
+ if (splitted.length > 1) {
+ logtType = splitted[0];
+ fileKey = splitted[1].replace(getFileExtension(), "");
+ } else {
+ fileKey = file.getName().replace(getFileExtension(), "");
+ }
+ if (checkFilter(logtType, logTypeFilter) || checkFilter(fileKey,
fileKeyFilter)) {
+ continue;
+ }
+ Stream.generate(() ->
'-').limit(fileNameTitle.length()).forEach(strBuilder::append);
+ String border = strBuilder.toString();
+ System.out.println(border);
+ System.out.println(String.format("file name: %s", file.getName()));
+ System.out.println(border);
+ if (org.apache.commons.lang.StringUtils.isNotBlank(logtType)) {
+ System.out.println(String.format("log_type: %s", logtType));
+ }
+ Map<String, String> checkpointJson =
CheckpointFileReader.getCheckpointObject(file);
+ for (Map.Entry<String, String> entry : checkpointJson.entrySet()) {
+ System.out.println(String.format("%s: %s", entry.getKey(),
entry.getValue()));
+ }
+ System.out.print("\n");
+ }
+ }
+ }
+
+ @Override
+ public void cleanCheckpoint(String checkpointLocation, String logTypeFilter,
String fileKeyFilter, boolean all) {
+ System.out.println(String.format("Searching checkpoint files in '%s'
folder ... (clean)", checkpointLocation));
+ File[] files = CheckpointFileReader.getFiles(new File(checkpointLocation),
".cp");
+ if (files != null) {
+ for (File file : files) {
+ String logtType = getLogTypeFromFileName(file.getName());
+ String fileKey = getFileKeyFromFileName(file.getName());
+ if (checkFilter(logtType, logTypeFilter) || checkFilter(fileKey,
fileKeyFilter)) {
+ continue;
+ }
+ if (!all && logTypeFilter == null && fileKeyFilter == null) {
+ throw new IllegalArgumentException("It is required to use a filter
for clean: --all, --log-type <log_type> or --file-key <file_key>");
+ }
+
+ if (all || logTypeFilter != null && logTypeFilter.equals(logtType) ||
+ fileKeyFilter != null && fileKeyFilter.equals(fileKey)) {
+ System.out.println(String.format("Deleting checkpoint file -
filename: %s, key: %s, log_type: %s", file.getAbsolutePath(), fileKey,
logtType));
+ file.delete();
+ }
+ }
+ }
+ }
+
+ private boolean verifyCheckPointFolder(File folderPathFile) {
+ if (!folderPathFile.exists()) {
+ try {
+ if (!folderPathFile.mkdir()) {
+ LOG.warn("Error creating folder for check point. folder=" +
folderPathFile);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Error creating folder for check point. folder=" +
folderPathFile, t);
+ }
+ }
+
+ if (folderPathFile.exists() && folderPathFile.isDirectory()) {
+ // Let's check whether we can create a file
+ File testFile = new File(folderPathFile, UUID.randomUUID().toString());
+ try {
+ testFile.createNewFile();
+ return testFile.delete();
+ } catch (IOException e) {
+ LOG.warn("Couldn't create test file in " +
folderPathFile.getAbsolutePath() + " for checkPoint", e);
+ }
+ }
+ return false;
+ }
+
+ private boolean checkFilter(String actualValue, String filterValue) {
+ return (org.apache.commons.lang.StringUtils.isNotBlank(actualValue) &&
org.apache.commons.lang.StringUtils.isNotBlank(filterValue) &&
+ !actualValue.equals(filterValue));
+ }
+
+ private String getFileExtension() {
+ return this.checkPointExtension == null ? ".cp" : this.checkPointExtension;
+ }
+
+ private String getFileKeyFromFileName(String fileName) {
+ String[] splitted = fileName.split("-");
+ String fileKeyResult = splitted.length > 1 ? splitted[1] : splitted[0];
+ return fileKeyResult.replace(getFileExtension(), "");
+ }
+
+ private String getLogTypeFromFileName(String fileName) {
+ String[] splitted = fileName.split("-");
+ String logTypeResult = "";
+ if (splitted.length > 1) {
+ logTypeResult = splitted[0];
+ }
+ return logTypeResult;
+ }
+
+}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/CheckpointFileReader.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/CheckpointFileReader.java
new file mode 100644
index 0000000..dd35d07
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/CheckpointFileReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file.checkpoint.util;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Type;
+import java.util.Map;
+
+public class CheckpointFileReader {
+
+ private CheckpointFileReader() {
+ }
+
+ public static File[] getFiles(File checkPointFolderFile, String
checkPointExtension) {
+ String searchPath = "*" + checkPointExtension;
+ FileFilter fileFilter = new WildcardFileFilter(searchPath);
+ return checkPointFolderFile.listFiles(fileFilter);
+ }
+
+ public static Map<String, String> getCheckpointObject(File checkPointFile)
throws IOException {
+ final Map<String, String> jsonCheckPoint;
+ try (RandomAccessFile checkPointReader = new
RandomAccessFile(checkPointFile, "r")) {
+ int contentSize = checkPointReader.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointReader.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ throw new IllegalArgumentException("Couldn't read expected number of
bytes from checkpoint file. expected=" + contentSize + ", read="
+ + readSize + ", checkPointFile=" + checkPointFile);
+ } else {
+ String jsonCheckPointStr = new String(b, 0, readSize);
+ Gson gson = new GsonBuilder().create();
+ Type type = new TypeToken<Map<String, String>>() {}.getType();
+ jsonCheckPoint = gson.fromJson(jsonCheckPointStr, type);
+ }
+ }
+ return jsonCheckPoint;
+ }
+
+
+}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckInHelper.java
similarity index 98%
rename from
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
rename to
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckInHelper.java
index 7b8f0cd..b217e34 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckInHelper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.logfeeder.input.file;
+package org.apache.ambari.logfeeder.input.file.checkpoint.util;
import org.apache.ambari.logfeeder.input.InputFile;
import org.apache.ambari.logfeeder.input.InputFileMarker;
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckpointCleanupHelper.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckpointCleanupHelper.java
new file mode 100644
index 0000000..91b5383
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckpointCleanupHelper.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file.checkpoint.util;
+
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.solr.common.util.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Map;
+
+public class FileCheckpointCleanupHelper {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FileCheckpointCleanupHelper.class);
+
+ private FileCheckpointCleanupHelper() {
+ }
+
+ public static void cleanCheckPointFiles(File checkPointFolderFile, String
checkPointExtension) {
+ if (checkPointFolderFile == null) {
+ LOG.info("Will not clean checkPoint files. checkPointFolderFile=null");
+ return;
+ }
+ LOG.info("Cleaning checkPoint files. checkPointFolderFile=" +
checkPointFolderFile.getAbsolutePath());
+ try {
+ // Loop over the check point files and if filePath is not present, then
move to closed
+ File[] checkPointFiles =
CheckpointFileReader.getFiles(checkPointFolderFile, checkPointExtension);
+ int totalCheckFilesDeleted = 0;
+ if (checkPointFiles != null) {
+ for (File checkPointFile : checkPointFiles) {
+ if (checkCheckPointFile(checkPointFile)) {
+ totalCheckFilesDeleted++;
+ }
+ }
+ LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s).
checkPointFolderFile=" +
+ checkPointFolderFile.getAbsolutePath());
+ }
+ } catch (Throwable t) {
+ LOG.error("Error while cleaning checkPointFiles", t);
+ }
+ }
+
+ private static boolean checkCheckPointFile(File checkPointFile) {
+ boolean deleted = false;
+ try (RandomAccessFile checkPointReader = new
RandomAccessFile(checkPointFile, "r")) {
+ int contentSize = checkPointReader.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointReader.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ LOG.error("Couldn't read expected number of bytes from checkpoint
file. expected=" + contentSize + ", read="
+ + readSize + ", checkPointFile=" + checkPointFile);
+ } else {
+ String jsonCheckPointStr = new String(b, 0, readSize);
+ Map<String, Object> jsonCheckPoint =
LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+ String logFilePath = (String) jsonCheckPoint.get("file_path");
+ String logFileKey = (String) jsonCheckPoint.get("file_key");
+ Integer maxAgeMin = null;
+ if (jsonCheckPoint.containsKey("max_age_min")) {
+ maxAgeMin =
Integer.parseInt(jsonCheckPoint.get("max_age_min").toString());
+ }
+ if (logFilePath != null && logFileKey != null) {
+ boolean deleteCheckPointFile = false;
+ File logFile = new File(logFilePath);
+ if (logFile.exists()) {
+ Object fileKeyObj = FileUtil.getFileKey(logFile);
+ String fileBase64 =
Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
+ if (!logFileKey.equals(fileBase64)) {
+ LOG.info("CheckPoint clean: File key has changed. old=" +
logFileKey + ", new=" + fileBase64 + ", filePath=" +
+ logFilePath + ", checkPointFile=" +
checkPointFile.getAbsolutePath());
+ deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(),
logFileKey);
+ } else if (maxAgeMin != null && maxAgeMin != 0 &&
FileUtil.isFileTooOld(logFile, maxAgeMin)) {
+ deleteCheckPointFile = true;
+ LOG.info("Checkpoint clean: File reached max age minutes (" +
maxAgeMin + "):" + logFilePath);
+ }
+ } else {
+ LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" +
logFilePath + ", checkPointFile=" +
+ checkPointFile.getAbsolutePath());
+ deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(),
logFileKey);
+ }
+ if (deleteCheckPointFile) {
+ LOG.info("Deleting CheckPoint file=" +
checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath);
+ checkPointFile.delete();
+ deleted = true;
+ }
+ }
+ }
+ } catch (EOFException eof) {
+ LOG.warn("Caught EOFException. Ignoring reading existing checkPoint
file. " + checkPointFile);
+ } catch (Throwable t) {
+ LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
+ }
+
+ return deleted;
+ }
+
+ private static boolean wasFileRenamed(File folder, String searchFileBase64) {
+ for (File file : folder.listFiles()) {
+ Object fileKeyObj = FileUtil.getFileKey(file);
+ String fileBase64 =
Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
+ if (searchFileBase64.equals(fileBase64)) {
+ // even though the file name in the checkpoint file is different from
the one it was renamed to, checkpoint files are
+ // identified by their name, which is generated from the file key,
which would be the same for the renamed file
+ LOG.info("CheckPoint clean: File key matches file " +
file.getAbsolutePath() + ", it must have been renamed");
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/ResumeLineNumberHelper.java
similarity index 89%
rename from
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
rename to
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/ResumeLineNumberHelper.java
index 9350200..664fa4f 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/ResumeLineNumberHelper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.logfeeder.input.file;
+package org.apache.ambari.logfeeder.input.file.checkpoint.util;
import org.apache.ambari.logfeeder.input.InputFile;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -36,15 +36,14 @@ public class ResumeLineNumberHelper {
private ResumeLineNumberHelper() {
}
- public static int getResumeFromLineNumber(InputFile inputFile) {
+ public static int getResumeFromLineNumber(InputFile inputFile, File
checkPointFolder) {
int resumeFromLineNumber = 0;
File checkPointFile = null;
try {
LOG.info("Checking existing checkpoint file. " +
inputFile.getShortDescription());
- String checkPointFileName = inputFile.getBase64FileKey() +
inputFile.getCheckPointExtension();
- File checkPointFolder =
inputFile.getInputManager().getCheckPointFolderFile();
+ String checkPointFileName = getCheckpointFileName(inputFile);
checkPointFile = new File(checkPointFolder, checkPointFileName);
inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(),
checkPointFile);
Map<String, Object> jsonCheckPoint = null;
@@ -88,4 +87,9 @@ public class ResumeLineNumberHelper {
return resumeFromLineNumber;
}
+ private static String getCheckpointFileName(InputFile inputFile) {
+ return String.format("%s-%s%s", inputFile.getLogType(),
+ inputFile.getBase64FileKey(), inputFile.getCheckPointExtension());
+ }
+
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
index 65a314e..45404c4 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
@@ -18,7 +18,7 @@
*/
package org.apache.ambari.logfeeder.input.monitor;
-import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,11 +27,11 @@ public class CheckpointCleanupMonitor implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(CheckpointCleanupMonitor.class);
private long waitIntervalMin;
- private InputManager inputManager;
+ private CheckpointManager checkpointHandler;
- public CheckpointCleanupMonitor(InputManager inputManager, long
waitIntervalMin) {
+ public CheckpointCleanupMonitor(CheckpointManager checkpointHandler, long
waitIntervalMin) {
this.waitIntervalMin = waitIntervalMin;
- this.inputManager = inputManager;
+ this.checkpointHandler = checkpointHandler;
}
@Override
@@ -39,7 +39,7 @@ public class CheckpointCleanupMonitor implements Runnable {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000 * 60 * waitIntervalMin);
- inputManager.cleanCheckPointFiles();
+ checkpointHandler.cleanupCheckpoints();
} catch (Exception e) {
LOG.error("Cleanup checkpoint files thread interrupted.", e);
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh
index abe062f..7f1d8ec 100755
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh
@@ -91,6 +91,7 @@ function print_usage() {
start Start Log Feeder
stop Stop Log Feeder
status Check Log Feeder status (pid file)
+ checkpoints Checkpoint operations
test Test Log Feeder shipper configs
help Print usage
@@ -106,6 +107,14 @@ function print_usage() {
-tgc, --test-global-config Global configuration files (comma separated
list) for testing if log entry is parseable
-tli, --test-log-id The id of the log to test
+ checkpoints command arguments:
+ -l, --list Print checkpoints
+ -cf, --checkpoints-folder Checkpoints folder location
+ -c, --clean Remove a checkpoint file (by key/log type
or use on all)
+ -k, --file-key Filter on file key (for list and clean)
+ -lt, --log-type Filter on log type (for list and clean)
+ -a, --all Flag all checkpoints to be deleted by clean
command
+
EOF
}
@@ -263,6 +272,11 @@ function test() {
$JVM -cp "$LOGFEEDER_CONF_DIR:$LOGFEEDER_LIBS_DIR/*" $LOGFEEDER_JAVA_OPTS
org.apache.ambari.logfeeder.LogFeederCommandLine --test ${@}
}
+function checkpoints() {
+ echo "Running command: $JVM -cp "$LOGFEEDER_CONF_DIR:$LOGFEEDER_LIBS_DIR/*"
org.apache.ambari.logfeeder.LogFeederCommandLine --checkpoints ${@}"
+ $JVM -cp "$LOGFEEDER_CONF_DIR:$LOGFEEDER_LIBS_DIR/*" $LOGFEEDER_JAVA_OPTS
org.apache.ambari.logfeeder.LogFeederCommandLine --checkpoints ${@}
+}
+
if [ $# -gt 0 ]; then
SCRIPT_CMD="$1"
shift
@@ -284,6 +298,9 @@ case $SCRIPT_CMD in
test)
test ${1+"$@"}
;;
+ checkpoints)
+ checkpoints ${1+"$@"}
+ ;;
help)
print_usage
exit 0
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
index 2219be1..0a95342 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -26,8 +26,10 @@ import java.util.List;
import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.file.checkpoint.FileCheckpointManager;
import org.apache.ambari.logfeeder.plugin.filter.Filter;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
import org.apache.ambari.logfeeder.plugin.manager.InputManager;
import
org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputFileDescriptorImpl;
import org.apache.commons.io.FileUtils;
@@ -77,6 +79,13 @@ public class InputFileTest {
FileUtils.cleanDirectory(TEST_DIR);
}
+ @AfterClass
+ public static void deleteDir() throws IOException {
+ if (TEST_DIR.exists()) {
+ FileUtils.deleteDirectory(TEST_DIR);
+ }
+ }
+
@Before
public void setUp() throws Exception {
logFeederProps = new LogFeederProps();
@@ -85,6 +94,7 @@ public class InputFileTest {
logEntryCacheConfig.setCacheLastDedupEnabled(false);
logEntryCacheConfig.setCacheSize(10);
logFeederProps.setLogEntryCacheConfig(logEntryCacheConfig);
+ logFeederProps.setCheckpointFolder("process3_checkpoint");
testInputMarker = new InputFileMarker(inputFile, "", 0);
}
@@ -125,13 +135,13 @@ public class InputFileTest {
public void testInputFile_process3Rows() throws Exception {
LOG.info("testInputFile_process3Rows()");
- File checkPointDir = createCheckpointDir("process3_checkpoint");
File testFile = createFile("process3.log");
init(testFile.getAbsolutePath());
InputManager inputManager = EasyMock.createStrictMock(InputManager.class);
-
EasyMock.expect(inputManager.getCheckPointFolderFile()).andReturn(checkPointDir);
+ CheckpointManager checkpointManager = new FileCheckpointManager();
+
EasyMock.expect(inputManager.getCheckpointHandler()).andReturn(checkpointManager);
EasyMock.replay(inputManager);
inputFile.setInputManager(inputManager);