Repository: ambari Updated Branches: refs/heads/trunk bac456060 -> c63b34cc2
AMBARI-17788. Refactor spooler code in OutputHDFSFile to be reusable for OutputS3File (Hemanth Yamijala via oleewere) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c63b34cc Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c63b34cc Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c63b34cc Branch: refs/heads/trunk Commit: c63b34cc2d02d58e4b76ac29c50e812a10eb3e6c Parents: bac4560 Author: oleewere <[email protected]> Authored: Fri Aug 5 14:15:48 2016 +0200 Committer: oleewere <[email protected]> Committed: Fri Aug 5 14:15:48 2016 +0200 ---------------------------------------------------------------------- .../ambari/logfeeder/output/OutputHDFSFile.java | 154 ++++------- .../logfeeder/output/spool/LogSpooler.java | 137 ++++++++++ .../output/spool/LogSpoolerContext.java | 85 ++++++ .../output/spool/LogSpoolerException.java | 29 +++ .../output/spool/RolloverCondition.java | 36 +++ .../logfeeder/output/spool/RolloverHandler.java | 40 +++ .../logfeeder/output/spool/LogSpoolerTest.java | 258 +++++++++++++++++++ 7 files changed, 640 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c63b34cc/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java index 87cc0eb..f711a5f 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java @@ -19,28 +19,32 @@ package org.apache.ambari.logfeeder.output; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.util.DateUtil; +import org.apache.ambari.logfeeder.output.spool.LogSpooler; +import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext; +import org.apache.ambari.logfeeder.output.spool.RolloverCondition; +import org.apache.ambari.logfeeder.output.spool.RolloverHandler; import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil; import org.apache.ambari.logfeeder.util.PlaceholderUtil; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; -public class OutputHDFSFile extends Output { +import java.io.File; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * An {@link Output} that records logs to HDFS. + * + * The events are spooled on the local file system and uploaded in batches asynchronously. + */ +public class OutputHDFSFile extends Output implements RolloverHandler, RolloverCondition { private final static Logger logger = Logger.getLogger(OutputHDFSFile.class); + private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>(); @@ -48,21 +52,15 @@ public class OutputHDFSFile extends Output { private Thread hdfsCopyThread = null; - private PrintWriter outWriter = null; - // local writer variables - private String localFilePath = null; private String filenamePrefix = "service-logs-"; - private String localFileDir = null; - private File localcurrentFile = null; - private Date localFileCreateTime = null; - private long localFileRolloverSec = 5 * 1 * 60;// 5 min by default + private long rolloverThresholdTimeMillis; private String hdfsOutDir = null; private String hdfsHost = null; private String hdfsPort = null; private FileSystem fileSystem = null; - private String fileDateFormat = "yyyy-MM-dd-HH-mm-ss"; + private LogSpooler logSpooler; @Override public void init() throws Exception { @@ -70,7 +68,8 @@ public class OutputHDFSFile extends Output { hdfsOutDir = getStringValue("hdfs_out_dir"); hdfsHost = getStringValue("hdfs_host"); hdfsPort = getStringValue("hdfs_port"); - localFileRolloverSec = getLongValue("rollover_sec", localFileRolloverSec); + long rolloverThresholdTimeSeconds = getLongValue("rollover_sec", DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS); + rolloverThresholdTimeMillis = rolloverThresholdTimeSeconds * 1000L; filenamePrefix = getStringValue("file_name_prefix", filenamePrefix); if (StringUtils.isEmpty(hdfsOutDir)) { logger @@ -90,23 +89,15 @@ public class OutputHDFSFile extends Output { HashMap<String, String> contextParam = buildContextParam(); hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam); logger.info("hdfs Output dir=" + hdfsOutDir); - localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/"; - localFilePath = localFileDir; + String localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/"; + logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this); this.startHDFSCopyThread(); } @Override public void close() { logger.info("Closing file." + getShortDescription()); - if (outWriter != null) { - try { - outWriter.flush(); - outWriter.close(); - addFileInReadyList(localcurrentFile); - } catch (Throwable t) { - logger.error(t.getLocalizedMessage(),t); - } - } + logSpooler.rollover(); this.stopHDFSCopyThread(); isClosed = true; } @@ -115,12 +106,8 @@ public class OutputHDFSFile extends Output { synchronized public void write(String block, InputMarker inputMarker) throws Exception { if (block != null) { - buildOutWriter(); - if (outWriter != null) { - statMetric.count++; - outWriter.println(block); - closeFileIfNeeded(); - } + logSpooler.add(block); + statMetric.count++; } } @@ -130,59 +117,6 @@ public class OutputHDFSFile extends Output { return "output:destination=hdfs,hdfsOutDir=" + hdfsOutDir; } - private synchronized void closeFileIfNeeded() throws FileNotFoundException, - IOException { - if (outWriter == null) { - return; - } - // TODO: Close the file on absolute time. Currently it is implemented as - // relative time - if (System.currentTimeMillis() - localFileCreateTime.getTime() > localFileRolloverSec * 1000) { - logger.info("Closing file. Rolling over. name=" - + localcurrentFile.getName() + ", filePath=" - + localcurrentFile.getAbsolutePath()); - try { - outWriter.flush(); - outWriter.close(); - addFileInReadyList(localcurrentFile); - } catch (Throwable t) { - logger - .error("Error on closing output writter. Exception will be ignored. name=" - + localcurrentFile.getName() - + ", filePath=" - + localcurrentFile.getAbsolutePath()); - } - - outWriter = null; - localcurrentFile = null; - } - } - - private synchronized void buildOutWriter() { - if (outWriter == null) { - String currentFilePath = localFilePath + getCurrentFileName(); - localcurrentFile = new File(currentFilePath); - if (localcurrentFile.getParentFile() != null) { - File parentDir = localcurrentFile.getParentFile(); - if (!parentDir.isDirectory()) { - parentDir.mkdirs(); - } - } - try { - outWriter = new PrintWriter(new BufferedWriter(new FileWriter( - localcurrentFile, true))); - } catch (IOException e) { - logger.error("= OutputHDFSFile.buidOutWriter failed for file : " - + localcurrentFile.getAbsolutePath() + " Desc: " - + getShortDescription() + " errorMsg: " + e.getLocalizedMessage(), - e); - } - localFileCreateTime = new Date(); - logger.info("Create file is successful. localFilePath=" - + localcurrentFile.getAbsolutePath()); - } - } - private void startHDFSCopyThread() { hdfsCopyThread = new Thread("hdfsCopyThread") { @@ -261,13 +195,6 @@ public class OutputHDFSFile extends Output { } } - private String getCurrentFileName() { - Date currentDate = new Date(); - String dateStr = DateUtil.dateToString(currentDate, fileDateFormat); - String fileName = filenamePrefix + dateStr; - return fileName; - } - private HashMap<String, String> buildContextParam() { HashMap<String, String> contextParam = new HashMap<String, String>(); contextParam.put("host", LogFeederUtil.hostName); @@ -291,4 +218,33 @@ public class OutputHDFSFile extends Output { throw new UnsupportedOperationException( "copyFile method is not yet supported for output=hdfs"); } + + /** + * Add the rollover file to a daemon thread for uploading to HDFS + * @param rolloverFile the file to be uploaded to HDFS + */ + @Override + public void handleRollover(File rolloverFile) { + addFileInReadyList(rolloverFile); + } + + /** + * Determines whether it is time to handleRollover the current spool file. + * + * The file will handleRollover if the time since creation of the file is more than + * the timeout specified in rollover_sec configuration. + * @param currentSpoolerContext {@link LogSpoolerContext} that holds state of active Spool file + * @return true if time since creation is greater than value specified in rollover_sec, + * false otherwise. + */ + @Override + public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) { + long timeSinceCreation = new Date().getTime() - currentSpoolerContext.getActiveLogCreationTime().getTime(); + boolean shouldRollover = timeSinceCreation > rolloverThresholdTimeMillis; + if (shouldRollover) { + logger.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() + + " has crossed threshold (msecs) " + rolloverThresholdTimeMillis); + } + return shouldRollover; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/c63b34cc/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java new file mode 100644 index 0000000..306326a --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output.spool; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.ambari.logfeeder.output.Output; +import org.apache.ambari.logfeeder.util.DateUtil; +import org.apache.log4j.Logger; + +import java.io.*; +import java.util.Date; + +/** + * A class that manages local storage of log events before they are uploaded to the output destinations. + * + * This class should be used by any {@link Output}s that wish to upload log files to an + * output destination on a periodic batched basis. Log events should be added to an instance + * of this class to be stored locally. This class determines when to + * rollover using calls to an interface {@link RolloverCondition}. Likewise, it uses an interface + * {@link RolloverHandler} to trigger the handling of the rolled over file. + */ +public class LogSpooler { + static private Logger logger = Logger.getLogger(LogSpooler.class); + static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss"; + + private String spoolDirectory; + private String sourceFileNamePrefix; + private RolloverCondition rolloverCondition; + private RolloverHandler rolloverHandler; + private PrintWriter currentSpoolBufferedWriter; + private File currentSpoolFile; + private LogSpoolerContext currentSpoolerContext; + + /** + * Create an instance of the LogSpooler. + * @param spoolDirectory The directory under which spooler files are created. + * Should be unique per instance of {@link Output} + * @param sourceFileNamePrefix The prefix with which the locally spooled files are created. + * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to + * determine when to rollover. + * @param rolloverHandler An object of type {@link RolloverHandler} that will be called when + * there should be a rollover. + */ + public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition, + RolloverHandler rolloverHandler) { + this.spoolDirectory = spoolDirectory; + this.sourceFileNamePrefix = sourceFileNamePrefix; + this.rolloverCondition = rolloverCondition; + this.rolloverHandler = rolloverHandler; + initializeSpoolFile(); + } + + private void initializeSpoolDirectory() { + File spoolDir = new File(spoolDirectory); + if (!spoolDir.exists()) { + logger.info("Creating spool directory: " + spoolDir); + boolean result = spoolDir.mkdirs(); + if (!result) { + throw new LogSpoolerException("Could not create spool directory: " + spoolDirectory); + } + } + } + + private void initializeSpoolFile() { + initializeSpoolDirectory(); + currentSpoolFile = new File(spoolDirectory, getCurrentFileName()); + try { + currentSpoolBufferedWriter = initializeSpoolWriter(currentSpoolFile); + } catch (IOException e) { + throw new LogSpoolerException("Could not create buffered writer for spool file: " + currentSpoolFile + + ", error message: " + e.getLocalizedMessage(), e); + } + currentSpoolerContext = new LogSpoolerContext(currentSpoolFile); + logger.info("Initialized spool file at path: " + currentSpoolFile.getAbsolutePath()); + } + + @VisibleForTesting + protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException { + return new PrintWriter(new BufferedWriter(new FileWriter(spoolFile))); + } + + /** + * Add an event for spooling. + * + * This method adds the event to the current spool file's buffer. On completion, it + * calls the {@link RolloverCondition#shouldRollover(LogSpoolerContext)} method to determine if + * it is ready to rollover the file. + * @param logEvent The log event to spool. + */ + public void add(String logEvent) { + currentSpoolBufferedWriter.println(logEvent); + currentSpoolerContext.logEventSpooled(); + if (rolloverCondition.shouldRollover(currentSpoolerContext)) { + rollover(); + } + } + + /** + * Trigger a rollover of the current spool file. + * + * This method manages the rollover of the spool file, and then invokes the + * {@link RolloverHandler#handleRollover(File)} to handle what should be done with the + * rolled over file. + */ + public void rollover() { + logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile); + currentSpoolBufferedWriter.flush(); + currentSpoolBufferedWriter.close(); + rolloverHandler.handleRollover(currentSpoolFile); + logger.info("Invoked rollover handler with file: " + currentSpoolFile); + initializeSpoolFile(); + } + + @VisibleForTesting + protected String getCurrentFileName() { + Date currentDate = new Date(); + String dateStr = DateUtil.dateToString(currentDate, fileDateFormat); + return sourceFileNamePrefix + dateStr; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c63b34cc/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java new file mode 100644 index 0000000..084d6a2 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output.spool; + +import java.io.File; +import java.util.Date; + +/** + * A class that holds the state of an spool file. + * + * The state in this class can be used by a {@link RolloverCondition} to determine + * if an active spool file should be rolled over. + */ +public class LogSpoolerContext { + + private File activeSpoolFile; + private long numEventsSpooled; + private Date activeLogCreationTime; + + /** + * Create a new LogSpoolerContext + * @param activeSpoolFile the spool file for which to hold state + */ + public LogSpoolerContext(File activeSpoolFile) { + this.activeSpoolFile = activeSpoolFile; + this.numEventsSpooled = 0; + this.activeLogCreationTime = new Date(); + } + + /** + * Increment number of spooled events by one. + */ + public void logEventSpooled() { + numEventsSpooled++; + } + + public File getActiveSpoolFile() { + return activeSpoolFile; + } + + public long getNumEventsSpooled() { + return numEventsSpooled; + } + + public Date getActiveLogCreationTime() { + return activeLogCreationTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LogSpoolerContext that = (LogSpoolerContext) o; + + if (numEventsSpooled != that.numEventsSpooled) return false; + if (!activeSpoolFile.equals(that.activeSpoolFile)) return false; + return activeLogCreationTime.equals(that.activeLogCreationTime); + + } + + @Override + public int hashCode() { + int result = activeSpoolFile.hashCode(); + result = 31 * result + (int) (numEventsSpooled ^ (numEventsSpooled >>> 32)); + result = 31 * result + activeLogCreationTime.hashCode(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c63b34cc/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java new file mode 100644 index 0000000..1e12fb7 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output.spool; + +public class LogSpoolerException extends RuntimeException { + public LogSpoolerException(String message, Exception cause) { + super(message, cause); + } + + public LogSpoolerException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c63b34cc/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java new file mode 100644 index 0000000..8279645 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output.spool; + +/** + * An interface that is used to determine whether a rollover of a locally spooled log file should be triggered. + */ +public interface RolloverCondition { + + /** + * Check if the active spool file should be rolled over. + * + * If this returns true, the {@link LogSpooler} will initiate activities related + * to rollover of the file + * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked + * for rollover. + * @return true if active spool file should be rolled over, false otherwise + */ + boolean shouldRollover(LogSpoolerContext currentSpoolerContext); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c63b34cc/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java new file mode 100644 index 0000000..11308e4 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output.spool; + +import java.io.File; + +/** + * An interface that is used to trigger the handling of a rolled over file. + * + * Implementations of this interface will typically upload the rolled over file to + * a target destination, like HDFS. + */ +public interface RolloverHandler { + /** + * Handle a rolled over file. + * + * This method is called inline from the {@link LogSpooler#rollover()} method. + * Hence implementations should either complete the handling fast, or do so + * asynchronously. The cleanup of the file is left to implementors, but should + * typically be done once the upload the file to the target destination is complete. + * @param rolloverFile The file that has been rolled over. + */ + void handleRollover(File rolloverFile); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c63b34cc/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java new file mode 100644 index 0000000..7d9d78a --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output.spool; + +import org.easymock.EasyMockRule; +import org.easymock.LogicalOperator; +import org.easymock.Mock; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Comparator; + +import static org.easymock.EasyMock.*; + +public class LogSpoolerTest { + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + @Rule + public EasyMockRule mocks = new EasyMockRule(this); + + private String spoolDirectory; + private static final String SOURCE_FILENAME_PREFIX = "hdfs-namenode.log"; + private static final String FILE_SUFFIX = "currentFile"; + + @Mock + private RolloverCondition rolloverCondition; + + @Mock + private RolloverHandler rolloverHandler; + + @Before + public void setup() { + spoolDirectory = testFolder.getRoot().getAbsolutePath(); + } + + @Test + public void shouldSpoolEventToFile() { + final PrintWriter spoolWriter = mock(PrintWriter.class); + spoolWriter.println("log event"); + + final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX); + LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile); + expect(rolloverCondition.shouldRollover( + cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))). + andReturn(false); + + replay(spoolWriter, rolloverCondition); + + LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, + rolloverCondition, rolloverHandler) { + @Override + protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException { + return spoolWriter; + } + + @Override + protected String getCurrentFileName() { + return SOURCE_FILENAME_PREFIX + FILE_SUFFIX; + } + }; + logSpooler.add("log event"); + + verify(spoolWriter); + } + + @Test + public void shouldIncrementSpooledEventsCount() { + + final PrintWriter spoolWriter = mock(PrintWriter.class); + spoolWriter.println("log event"); + + final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX); + LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile); + logSpoolerContext.logEventSpooled(); + expect(rolloverCondition.shouldRollover( + cmp(logSpoolerContext, new LogSpoolerEventCountComparator(), LogicalOperator.EQUAL))). + andReturn(false); + + replay(spoolWriter, rolloverCondition); + + LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, + rolloverCondition, rolloverHandler) { + @Override + protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException { + return spoolWriter; + } + + @Override + protected String getCurrentFileName() { + return SOURCE_FILENAME_PREFIX + FILE_SUFFIX; + } + }; + logSpooler.add("log event"); + + verify(rolloverCondition); + } + + @Test + public void shouldCloseCurrentSpoolFileOnRollOver() { + final PrintWriter spoolWriter = mock(PrintWriter.class); + spoolWriter.println("log event"); + spoolWriter.flush(); + spoolWriter.close(); + + File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX); + LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile); + expect(rolloverCondition.shouldRollover( + cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))). + andReturn(true); + rolloverHandler.handleRollover(spoolFile); + + replay(spoolWriter, rolloverCondition, rolloverHandler); + + LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, + rolloverCondition, rolloverHandler) { + + @Override + protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException { + return spoolWriter; + } + + @Override + protected String getCurrentFileName() { + return SOURCE_FILENAME_PREFIX + FILE_SUFFIX; + } + }; + logSpooler.add("log event"); + + verify(spoolWriter); + } + + @Test + public void shouldReinitializeFileOnRollover() { + final PrintWriter spoolWriter1 = mock(PrintWriter.class); + final PrintWriter spoolWriter2 = mock(PrintWriter.class); + spoolWriter1.println("log event1"); + spoolWriter2.println("log event2"); + spoolWriter1.flush(); + spoolWriter1.close(); + + File spoolFile1 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1"); + File spoolFile2 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2"); + + LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(spoolFile1); + expect(rolloverCondition.shouldRollover( + cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL)) + ).andReturn(true); + + LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(spoolFile2); + expect(rolloverCondition.shouldRollover( + cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL)) + ).andReturn(false); + + rolloverHandler.handleRollover(spoolFile1); + + replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler); + + LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, + rolloverCondition, rolloverHandler) { + private boolean wasRolledOver; + + @Override + protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException { + if (!wasRolledOver) { + wasRolledOver = true; + return spoolWriter1; + } else { + return spoolWriter2; + } + } + + @Override + protected String getCurrentFileName() { + if (!wasRolledOver) { + return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1"; + } else { + return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2"; + } + } + }; + logSpooler.add("log event1"); + logSpooler.add("log event2"); + + verify(spoolWriter1, spoolWriter2, rolloverCondition); + } + + @Test + public void shouldCallRolloverHandlerOnRollover() { + final PrintWriter spoolWriter = mock(PrintWriter.class); + spoolWriter.println("log event"); + spoolWriter.flush(); + spoolWriter.close(); + + File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX); + LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile); + expect(rolloverCondition.shouldRollover( + cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL)) + ).andReturn(true); + rolloverHandler.handleRollover(spoolFile); + + replay(spoolWriter, rolloverCondition, rolloverHandler); + + LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, + rolloverCondition, rolloverHandler) { + + @Override + protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException { + return spoolWriter; + } + + @Override + protected String getCurrentFileName() { + return SOURCE_FILENAME_PREFIX + FILE_SUFFIX; + } + }; + logSpooler.add("log event"); + + verify(rolloverHandler); + } + + class LogSpoolerFileComparator implements Comparator<LogSpoolerContext> { + @Override + public int compare(LogSpoolerContext o1, LogSpoolerContext o2) { + return o1.getActiveSpoolFile().compareTo(o2.getActiveSpoolFile()); + } + } + + class LogSpoolerEventCountComparator implements Comparator<LogSpoolerContext> { + @Override + public int compare(LogSpoolerContext o1, LogSpoolerContext o2) { + return (int)(o1.getNumEventsSpooled()-o2.getNumEventsSpooled()); + } + } + +}
