Repository: flume Updated Branches: refs/heads/trunk 2fe393898 -> 2252fb193
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java index cb36e41..42474c4 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java @@ -19,21 +19,19 @@ package org.apache.flume.source.taildir; -import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*; +import com.google.common.collect.Lists; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.util.Comparator; import java.util.List; import java.util.Map; -import org.apache.flume.Event; -import org.apache.flume.event.EventBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.BYTE_OFFSET_HEADER_KEY; public class TailFile { private static final Logger logger = LoggerFactory.getLogger(TailFile.class); @@ -41,8 +39,8 @@ public class TailFile { private static final byte BYTE_NL = (byte) 10; private static final byte BYTE_CR = (byte) 13; - private final static int BUFFER_SIZE = 8192; - private final static int NEED_READING = -1; + private static final int BUFFER_SIZE = 8192; + private static final int NEED_READING = -1; private RandomAccessFile raf; private final String path; @@ -61,7 +59,7 @@ public class TailFile { this.raf = new RandomAccessFile(file, "r"); if (pos > 0) { raf.seek(pos); - lineReadPos=pos; + lineReadPos = pos; } this.path = file.getAbsolutePath(); this.inode = inode; @@ -70,22 +68,56 @@ public class TailFile { this.needTail = true; this.headers = headers; this.oldBuffer = new byte[0]; - this.bufferPos= NEED_READING; + this.bufferPos = NEED_READING; + } + + public RandomAccessFile getRaf() { + return raf; + } + + public String getPath() { + return path; + } + + public long getInode() { + return inode; } - public RandomAccessFile getRaf() { return raf; } - public String getPath() { return path; } - public long getInode() { return inode; } - public long getPos() { return pos; } - public long getLastUpdated() { return lastUpdated; } - public boolean needTail() { return needTail; } - public Map<String, String> getHeaders() { return headers; } - public long getLineReadPos() { return lineReadPos; } + public long getPos() { + return pos; + } + + public long getLastUpdated() { + return lastUpdated; + } + + public boolean needTail() { + return needTail; + } + + public Map<String, String> getHeaders() { + return headers; + } + + public long getLineReadPos() { + return lineReadPos; + } - public void setPos(long pos) { this.pos = pos; } - public void setLastUpdated(long lastUpdated) { this.lastUpdated = lastUpdated; } - public void setNeedTail(boolean needTail) { this.needTail = needTail; } - public void setLineReadPos(long lineReadPos) { this.lineReadPos = lineReadPos; } + public void setPos(long pos) { + this.pos = pos; + } + + public void setLastUpdated(long lastUpdated) { + this.lastUpdated = lastUpdated; + } + + public void setNeedTail(boolean needTail) { + this.needTail = needTail; + } + + public void setLineReadPos(long lineReadPos) { + this.lineReadPos = lineReadPos; + } public boolean updatePos(String path, long inode, long pos) throws IOException { if (this.inode == inode && this.path.equals(path)) { @@ -99,7 +131,7 @@ public class TailFile { public void updateFilePos(long pos) throws IOException { raf.seek(pos); lineReadPos = pos; - bufferPos= NEED_READING; + bufferPos = NEED_READING; oldBuffer = new byte[0]; } @@ -146,7 +178,8 @@ public class TailFile { bufferPos = 0; } - private byte[] concatByteArrays(byte[] a, int startIdxA, int lenA, byte[] b, int startIdxB, int lenB) { + private byte[] concatByteArrays(byte[] a, int startIdxA, int lenA, + byte[] b, int startIdxB, int lenB) { byte[] c = new byte[lenA + lenB]; System.arraycopy(a, startIdxA, c, 0, lenA); System.arraycopy(b, startIdxB, c, lenA, lenB); @@ -195,7 +228,8 @@ public class TailFile { break; } // NEW_LINE not showed up at the end of the buffer - oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length, buffer, bufferPos, (buffer.length - bufferPos)); + oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length, + buffer, bufferPos, buffer.length - bufferPos); bufferPos = NEED_READING; } return lineResult; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java index 245aef5..ad9f720 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java @@ -27,7 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.FileFilter; import java.io.IOException; import java.nio.file.DirectoryStream; import java.nio.file.FileSystem; @@ -40,25 +39,25 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; /** - * Identifies and caches the files matched by single file pattern for <code>TAILDIR<code/> source. + * Identifies and caches the files matched by single file pattern for {@code TAILDIR} source. * <p></p> - * Since file patterns only apply to the fileNames and not the parent dictionaries, this implementation - * checks the parent directory for modification (additional or removed files update modification time of parent dir) - * If no modification happened to the parent dir that means the underlying files could only be written to but no need - * to rerun the pattern matching on fileNames. + * Since file patterns only apply to the fileNames and not the parent dictionaries, this + * implementation checks the parent directory for modification (additional or removed files + * update modification time of parent dir) + * If no modification happened to the parent dir that means the underlying files could only be + * written to but no need to rerun the pattern matching on fileNames. * <p></p> - * This implementation provides lazy caching or no caching. Instances of this class keep the result - * file list from the last successful execution - * of {@linkplain #getMatchingFiles()} function invocation, - * and may serve the content without hitting the FileSystem for performance optimization. + * This implementation provides lazy caching or no caching. Instances of this class keep the + * result file list from the last successful execution of {@linkplain #getMatchingFiles()} + * function invocation, and may serve the content without hitting the FileSystem for performance + * optimization. * <p></p> - * <b>IMPORTANT:</b> It is assumed that the hosting system provides at least second granularity for both - * <code>System.currentTimeMillis()</code> and <code>File.lastModified()</code>. Also that system clock is used - * for file system timestamps. If it is not the case then configure it as uncached. - * Class is solely for package only usage. Member functions are not thread safe. + * <b>IMPORTANT:</b> It is assumed that the hosting system provides at least second granularity + * for both {@code System.currentTimeMillis()} and {@code File.lastModified()}. Also + * that system clock is used for file system timestamps. If it is not the case then configure it + * as uncached. Class is solely for package only usage. Member functions are not thread safe. * * @see TaildirSource * @see ReliableTaildirEventReader @@ -84,29 +83,33 @@ public class TaildirMatcher { // system time in milliseconds, stores the last modification time of the // parent directory seen by the last check, rounded to seconds - // initial value is used in first check only when it will be replaced instantly (system time is positive) + // initial value is used in first check only when it will be replaced instantly + // (system time is positive) private long lastSeenParentDirMTime = -1; // system time in milliseconds, time of the last check, rounded to seconds - // initial value is used in first check only when it will be replaced instantly (system time is positive) + // initial value is used in first check only when it will be replaced instantly + // (system time is positive) private long lastCheckedTime = -1; // cached content, files which matched the pattern within the parent directory private List<File> lastMatchedFiles = Lists.newArrayList(); /** - * Package accessible constructor. From configuration context it represents a single <code>filegroup</code> - * and encapsulates the corresponding <code>filePattern</code>. - * <code>filePattern</code> consists of two parts: first part has to be a valid path to - * an existing parent directory, second part has to be a - * valid regex {@link java.util.regex.Pattern} that match any non-hidden file names within parent directory. - * A valid example for filePattern is <code>/dir0/dir1/.*</code> given <code>/dir0/dir1</code> - * is an existing directory structure readable by the running user. + * Package accessible constructor. From configuration context it represents a single + * <code>filegroup</code> and encapsulates the corresponding <code>filePattern</code>. + * <code>filePattern</code> consists of two parts: first part has to be a valid path to an + * existing parent directory, second part has to be a valid regex + * {@link java.util.regex.Pattern} that match any non-hidden file names within parent directory + * . A valid example for filePattern is <code>/dir0/dir1/.*</code> given + * <code>/dir0/dir1</code> is an existing directory structure readable by the running user. * <p></p> * An instance of this class is created for each fileGroup * * @param fileGroup arbitrary name of the group given by the config - * @param filePattern parent directory plus regex pattern. No wildcards are allowed in directory name - * @param cachePatternMatching default true, recommended in every setup especially with huge parent directories. - * Don't set when local system clock is not used for stamping mtime (eg: remote filesystems) + * @param filePattern parent directory plus regex pattern. No wildcards are allowed in directory + * name + * @param cachePatternMatching default true, recommended in every setup especially with huge + * parent directories. Don't set when local system clock is not used + * for stamping mtime (eg: remote filesystems) * @see TaildirSourceConfigurationConstants */ TaildirMatcher(String fileGroup, String filePattern, boolean cachePatternMatching) { @@ -133,11 +136,12 @@ public class TaildirMatcher { } /** - * Lists those files within the parentDir that match regex pattern passed in during object instantiation. - * Designed for frequent periodic invocation {@link org.apache.flume.source.PollableSourceRunner}. + * Lists those files within the parentDir that match regex pattern passed in during object + * instantiation. Designed for frequent periodic invocation + * {@link org.apache.flume.source.PollableSourceRunner}. * <p></p> - * Based on the modification of the parentDir this function may trigger cache recalculation by calling - * {@linkplain #getMatchingFilesNoCache()} or + * Based on the modification of the parentDir this function may trigger cache recalculation by + * calling {@linkplain #getMatchingFilesNoCache()} or * return the value stored in {@linkplain #lastMatchedFiles}. * Parentdir is allowed to be a symbolic link. * <p></p> @@ -146,7 +150,8 @@ public class TaildirMatcher { * so it may (or may not) reflect updates to the directory that occur during the call, * In which case next call * will return those files (as mtime is increasing it won't hit cache but trigger recalculation). - * It is guaranteed that invocation reflects every change which was observable at the time of invocation. + * It is guaranteed that invocation reflects every change which was observable at the time of + * invocation. * <p></p> * Matching file list recalculation is triggered when caching was turned off or * if mtime is greater than the previously seen mtime @@ -156,8 +161,8 @@ public class TaildirMatcher { * within the same second so in such case (assuming at least second granularity of reported mtime) * it is impossible to tell whether a change of the dir happened before the check or after * (unless the check happened after that second). - * Having said that implementation also stores system time of the previous invocation and previous invocation has to - * happen strictly after the current mtime to avoid further cache refresh + * Having said that implementation also stores system time of the previous invocation and previous + * invocation has to happen strictly after the current mtime to avoid further cache refresh * (because then it is guaranteed that previous invocation resulted in valid cache content). * If system clock hasn't passed the second of * the current mtime then logic expects more changes as well @@ -166,23 +171,26 @@ public class TaildirMatcher { * hence it recalculates matching files. If system clock finally * passed actual mtime then a subsequent invocation guarantees that it picked up every * change from the passed second so - * any further invocations can be served from cache associated with that second (given mtime is not updated again). + * any further invocations can be served from cache associated with that second + * (given mtime is not updated again). * - * @return List of files matching the pattern sorted by last modification time. No recursion. No directories. - * If nothing matches then returns an empty list. If I/O issue occurred then returns the list collected to the point - * when exception was thrown. + * @return List of files matching the pattern sorted by last modification time. No recursion. + * No directories. If nothing matches then returns an empty list. If I/O issue occurred then + * returns the list collected to the point when exception was thrown. * * @see #getMatchingFilesNoCache() */ List<File> getMatchingFiles() { - long now = TimeUnit.SECONDS.toMillis(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); + long now = TimeUnit.SECONDS.toMillis( + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); long currentParentDirMTime = parentDir.lastModified(); List<File> result; // calculate matched files if // - we don't want to use cache (recalculate every time) OR // - directory was clearly updated after the last check OR - // - last mtime change wasn't already checked for sure (system clock hasn't passed that second yet) + // - last mtime change wasn't already checked for sure + // (system clock hasn't passed that second yet) if (!cachePatternMatching || lastSeenParentDirMTime < currentParentDirMTime || !(currentParentDirMTime < lastCheckedTime)) { @@ -199,13 +207,13 @@ public class TaildirMatcher { * files are matching the regex pattern. Each invocation uses {@link DirectoryStream} * to identify matching files. * - * Files returned by this call are weakly consistent (see {@link DirectoryStream}). It does not freeze the directory while iterating, - * so it may (or may not) reflect updates to the directory that occur during the call. In which case next call - * will return those files. + * Files returned by this call are weakly consistent (see {@link DirectoryStream}). + * It does not freeze the directory while iterating, so it may (or may not) reflect updates + * to the directory that occur during the call. In which case next call will return those files. * * @return List of files matching the pattern unsorted. No recursion. No directories. - * If nothing matches then returns an empty list. If I/O issue occurred then returns the list collected to the point - * when exception was thrown. + * If nothing matches then returns an empty list. If I/O issue occurred then returns the list + * collected to the point when exception was thrown. * * @see DirectoryStream * @see DirectoryStream.Filter @@ -217,16 +225,16 @@ public class TaildirMatcher { result.add(entry.toFile()); } } catch (IOException e) { - logger.error("I/O exception occurred while listing parent directory. Files already matched will be returned. " + - parentDir.toPath(), e); + logger.error("I/O exception occurred while listing parent directory. " + + "Files already matched will be returned. " + parentDir.toPath(), e); } return result; } /** * Utility function to sort matched files based on last modification time. - * Sorting itself use only a snapshot of last modification times captured before the sorting to keep the - * number of stat system calls to the required minimum. + * Sorting itself use only a snapshot of last modification times captured before the sorting + * to keep the number of stat system calls to the required minimum. * * @param files list of files in any order * @return sorted list http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index dfb5b29..eae1b1a 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -150,7 +150,8 @@ public class TaildirSource extends AbstractSource implements String fileGroups = context.getString(FILE_GROUPS); Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS); - filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX), fileGroups.split("\\s+")); + filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX), + fileGroups.split("\\s+")); Preconditions.checkState(!filePaths.isEmpty(), "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'"); @@ -168,12 +169,13 @@ public class TaildirSource extends AbstractSource implements byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER); idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT); writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL); - cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING, DEFAULT_CACHE_PATTERN_MATCHING); + cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING, + DEFAULT_CACHE_PATTERN_MATCHING); - backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT - , PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT); - maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP - , PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP); + backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT, + PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT); + maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP, + PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); @@ -254,7 +256,7 @@ public class TaildirSource extends AbstractSource implements reader.commit(); } catch (ChannelException ex) { logger.warn("The channel is full or unexpected failure. " + - "The source will try again after " + retryInterval + " ms"); + "The source will try again after " + retryInterval + " ms"); TimeUnit.MILLISECONDS.sleep(retryInterval); retryInterval = retryInterval << 1; retryInterval = Math.min(retryInterval, maxRetryInterval); @@ -320,7 +322,7 @@ public class TaildirSource extends AbstractSource implements String json = toPosInfoJson(); writer.write(json); } - } catch (Throwable t){ + } catch (Throwable t) { logger.error("Failed writing positionFile", t); } finally { try { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java index b0c934d..2c49540 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java @@ -50,7 +50,9 @@ public class TaildirSourceConfigurationConstants { public static final String BYTE_OFFSET_HEADER_KEY = "byteoffset"; public static final boolean DEFAULT_BYTE_OFFSET_HEADER = false; - /** Whether to cache the list of files matching the specified file patterns till parent directory is modified. */ + /** Whether to cache the list of files matching the specified file patterns till parent directory + * is modified. + */ public static final String CACHE_PATTERN_MATCHING = "cachePatternMatching"; public static final boolean DEFAULT_CACHE_PATTERN_MATCHING = true; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java index f488bae..efb6457 100644 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java @@ -29,39 +29,39 @@ import org.junit.Test; public class TestRpcClientCommunicationFailure { - public static final String CONFIG_FILE_PRCCLIENT_TEST = - "rpc-client-test.properties"; + public static final String CONFIG_FILE_PRCCLIENT_TEST = + "rpc-client-test.properties"; - @Test - public void testFailure() throws Exception { - try { + @Test + public void testFailure() throws Exception { + try { - StagedInstall.getInstance().startAgent( - "rpccagent", CONFIG_FILE_PRCCLIENT_TEST); - StagedInstall.waitUntilPortOpens("localhost", 12121, 20000); - RpcClient client = RpcClientFactory.getDefaultInstance( - "localhost", 12121); - String[] text = {"foo", "bar", "xyz", "abc"}; - for (String str : text) { - client.append(EventBuilder.withBody(str.getBytes())); - } + StagedInstall.getInstance().startAgent( + "rpccagent", CONFIG_FILE_PRCCLIENT_TEST); + StagedInstall.waitUntilPortOpens("localhost", 12121, 20000); + RpcClient client = RpcClientFactory.getDefaultInstance( + "localhost", 12121); + String[] text = {"foo", "bar", "xyz", "abc"}; + for (String str : text) { + client.append(EventBuilder.withBody(str.getBytes())); + } - // Stop the agent - StagedInstall.getInstance().stopAgent(); + // Stop the agent + StagedInstall.getInstance().stopAgent(); - // Try sending the event which should fail - try { - client.append(EventBuilder.withBody("test".getBytes())); - Assert.fail("EventDeliveryException expected but not raised"); - } catch (EventDeliveryException ex) { - System.out.println("Attempting to close client"); - client.close(); - } - } finally { - if (StagedInstall.getInstance().isRunning()) { - StagedInstall.getInstance().stopAgent(); - } - } - } + // Try sending the event which should fail + try { + client.append(EventBuilder.withBody("test".getBytes())); + Assert.fail("EventDeliveryException expected but not raised"); + } catch (EventDeliveryException ex) { + System.out.println("Attempting to close client"); + client.close(); + } + } finally { + if (StagedInstall.getInstance().isRunning()) { + StagedInstall.getInstance().stopAgent(); + } + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java index 7abb7eb..f892d89 100644 --- a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java +++ b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java @@ -49,8 +49,7 @@ import java.util.Properties; import java.util.Set; public class FileChannelIntegrityTool implements FlumeTool { - public static final Logger LOG = LoggerFactory.getLogger - (FileChannelIntegrityTool.class); + public static final Logger LOG = LoggerFactory.getLogger(FileChannelIntegrityTool.class); private final List<File> dataDirs = new ArrayList<File>(); @@ -66,18 +65,18 @@ public class FileChannelIntegrityTool implements FlumeTool { @Override public void run(String[] args) throws IOException, ParseException { boolean shouldContinue = parseCommandLineOpts(args); - if(!shouldContinue) { + if (!shouldContinue) { LOG.error("Could not parse command line options. Exiting ..."); System.exit(1); } - for(File dataDir : dataDirs) { + for (File dataDir : dataDirs) { File[] dataFiles = dataDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { - if(!name.endsWith(Serialization.METADATA_FILENAME) - && !name.endsWith(Serialization.METADATA_TMP_FILENAME) - && !name.endsWith(Serialization.OLD_METADATA_FILENAME) - && !name.equals(Log.FILE_LOCK)) { + if (!name.endsWith(Serialization.METADATA_FILENAME) + && !name.endsWith(Serialization.METADATA_TMP_FILENAME) + && !name.endsWith(Serialization.OLD_METADATA_FILENAME) + && !name.equals(Log.FILE_LOCK)) { return true; } return false; @@ -86,10 +85,8 @@ public class FileChannelIntegrityTool implements FlumeTool { if (dataFiles != null && dataFiles.length > 0) { for (File dataFile : dataFiles) { LOG.info("Checking for corruption in " + dataFile.toString()); - LogFile.SequentialReader reader = - new LogFileV3.SequentialReader(dataFile, null, true); - LogFile.OperationRecordUpdater updater = new LogFile - .OperationRecordUpdater(dataFile); + LogFile.SequentialReader reader = new LogFileV3.SequentialReader(dataFile, null, true); + LogFile.OperationRecordUpdater updater = new LogFile.OperationRecordUpdater(dataFile); boolean fileDone = false; boolean fileBackedup = false; while (!fileDone) { @@ -106,7 +103,7 @@ public class FileChannelIntegrityTool implements FlumeTool { if (record != null) { TransactionEventRecord recordEvent = record.getEvent(); Event event = EventUtils.getEventFromTransactionEvent(recordEvent); - if(event != null) { + if (event != null) { totalPutEvents++; try { if (!eventValidator.validateEvent(event)) { @@ -124,7 +121,8 @@ public class FileChannelIntegrityTool implements FlumeTool { // OOPS, didn't expected an exception // considering as failure case // marking as noop - System.err.println("Encountered Exception while validating event, marking as invalid"); + System.err.println("Encountered Exception while validating event, " + + "marking as invalid"); updater.markRecordAsNoop(eventPosition); eventsWithException++; } @@ -135,11 +133,10 @@ public class FileChannelIntegrityTool implements FlumeTool { } catch (CorruptEventException e) { corruptEvents++; totalChannelEvents++; - LOG.warn("Corruption found in " + dataFile.toString() + " at " - + eventPosition); + LOG.warn("Corruption found in " + dataFile.toString() + " at " + eventPosition); if (!fileBackedup) { Serialization.copyFile(dataFile, new File(dataFile.getParent(), - dataFile.getName() + ".bak")); + dataFile.getName() + ".bak")); fileBackedup = true; } updater.markRecordAsNoop(eventPosition); @@ -155,49 +152,46 @@ public class FileChannelIntegrityTool implements FlumeTool { private boolean parseCommandLineOpts(String[] args) throws ParseException { Options options = new Options(); - options - .addOption("l", "dataDirs", true, "Comma-separated list of data " + - "directories which the tool must verify. This option is mandatory") - .addOption("h", "help", false, "Display help") - .addOption("e", "eventValidator", true, "Fully Qualified Name of Event Validator Implementation");; + options.addOption("l", "dataDirs", true, "Comma-separated list of data " + + "directories which the tool must verify. This option is mandatory") + .addOption("h", "help", false, "Display help") + .addOption("e", "eventValidator", true, + "Fully Qualified Name of Event Validator Implementation"); - - Option property = OptionBuilder.withArgName("property=value") + Option property = OptionBuilder.withArgName("property=value") .hasArgs(2) .withValueSeparator() - .withDescription( "custom properties" ) - .create( "D" ); + .withDescription("custom properties") + .create("D"); options.addOption(property); CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); - if(commandLine.hasOption("help")) { - new HelpFormatter().printHelp("bin/flume-ng tool fcintegritytool ", - options, true); + if (commandLine.hasOption("help")) { + new HelpFormatter().printHelp("bin/flume-ng tool fcintegritytool ", options, true); return false; } - if(!commandLine.hasOption("dataDirs")) { + if (!commandLine.hasOption("dataDirs")) { new HelpFormatter().printHelp("bin/flume-ng tool fcintegritytool ", "", - options, "dataDirs is required.", true); + options, "dataDirs is required.", true); return false; } else { - String dataDirStr[] = commandLine.getOptionValue("dataDirs").split(","); - for(String dataDir : dataDirStr) { + String[] dataDirStr = commandLine.getOptionValue("dataDirs").split(","); + for (String dataDir : dataDirStr) { File f = new File(dataDir); - if(!f.exists()) { - throw new FlumeException("Data directory, " + dataDir + " does not " + - "exist."); + if (!f.exists()) { + throw new FlumeException("Data directory, " + dataDir + " does not exist."); } dataDirs.add(f); } } - if(commandLine.hasOption("eventValidator")) { + if (commandLine.hasOption("eventValidator")) { try { Class<? extends EventValidator.Builder> eventValidatorClassName = - (Class<? extends EventValidator.Builder>)Class.forName( - commandLine.getOptionValue("eventValidator")); + (Class<? extends EventValidator.Builder>)Class.forName( + commandLine.getOptionValue("eventValidator")); EventValidator.Builder eventValidatorBuilder = eventValidatorClassName.newInstance(); // Pass on the configuration parameter @@ -226,12 +220,13 @@ public class FileChannelIntegrityTool implements FlumeTool { */ private void printSummary() { System.out.println("---------- Summary --------------------"); - System.out.println("Number of Events in the Channel = "+totalChannelEvents++); - System.out.println("Number of Put Events Processed = "+totalPutEvents); - System.out.println("Number of Valid Put Events = "+validEvents); - System.out.println("Number of Invalid Put Events = "+invalidEvents); - System.out.println("Number of Put Events that threw Exception during validation = "+eventsWithException); - System.out.println("Number of Corrupt Events = "+corruptEvents); + System.out.println("Number of Events in the Channel = " + totalChannelEvents++); + System.out.println("Number of Put Events Processed = " + totalPutEvents); + System.out.println("Number of Valid Put Events = " + validEvents); + System.out.println("Number of Invalid Put Events = " + invalidEvents); + System.out.println("Number of Put Events that threw Exception during validation = " + + eventsWithException); + System.out.println("Number of Corrupt Events = " + corruptEvents); System.out.println("---------------------------------------"); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java index 1bb122d..e9ba27d 100644 --- a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java +++ b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java @@ -34,7 +34,7 @@ public enum FlumeToolType { public static String getNames() { StringBuilder builder = new StringBuilder(); - for(FlumeToolType type: values()) { + for (FlumeToolType type: values()) { builder.append(type.name().toLowerCase(Locale.ENGLISH) + "\n"); } return builder.toString(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java index 799ce85..426f55f 100644 --- a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java +++ b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java @@ -36,33 +36,32 @@ public class FlumeToolsMain implements FlumeTool { } @Override - public void run(String[] args) throws Exception{ + public void run(String[] args) throws Exception { String error = "Expected name of tool and arguments for" + - " tool to be passed in on the command line. Please pass one of the " + - "following as arguments to this command: \n"; + " tool to be passed in on the command line. Please pass one of the " + + "following as arguments to this command: \n"; StringBuilder builder = new StringBuilder(error); - for(FlumeToolType type : FlumeToolType.values()) { + for (FlumeToolType type : FlumeToolType.values()) { builder.append(type.name()).append("\n"); } - if(args == null || args.length == 0) { + if (args == null || args.length == 0) { System.out.println(builder.toString()); System.exit(1); } String toolName = args[0]; FlumeTool tool = null; - for(FlumeToolType type : FlumeToolType.values()) { - if(toolName.equalsIgnoreCase(type.name())) { + for (FlumeToolType type : FlumeToolType.values()) { + if (toolName.equalsIgnoreCase(type.name())) { tool = type.getClassInstance().newInstance(); break; } } Preconditions.checkNotNull(tool, "Cannot find tool matching " + toolName - + ". Please select one of: \n " + FlumeToolType.getNames()); + + ". Please select one of: \n " + FlumeToolType.getNames()); if (args.length == 1) { tool.run(new String[0]); } else { - tool.run(Arrays.asList(args).subList(1, args.length). - toArray(new String[0])); + tool.run(Arrays.asList(args).subList(1, args.length).toArray(new String[0])); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9954353..85c0dc8 100644 --- a/pom.xml +++ b/pom.xml @@ -47,6 +47,8 @@ limitations under the License. <stagingDirectory>${project.basedir}/target/docs</stagingDirectory> <avro.version>1.7.4</avro.version> + <checkstyle.plugin.version>2.17</checkstyle.plugin.version> + <checkstyle.tool.version>6.19</checkstyle.tool.version> <elasticsearch.version>0.90.1</elasticsearch.version> <hadoop2.version>2.4.0</hadoop2.version> <thrift.version>0.7.0</thrift.version> @@ -58,6 +60,7 @@ limitations under the License. </properties> <modules> + <module>flume-checkstyle</module> <module>flume-ng-core</module> <module>flume-ng-configuration</module> <module>flume-ng-embedded-agent</module> @@ -638,6 +641,49 @@ limitations under the License. <artifactId>apache-rat-plugin</artifactId> </plugin> + <!-- Run checkstyle as part of the "verify" phase. See + https://maven.apache.org/plugins/maven-checkstyle-plugin/examples/multi-module-config.html + for how to configure plugin for a multi-module project. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>${checkstyle.plugin.version}</version> + <dependencies> + <!-- The flume-checkstyle module adds the checkstyle config files to the classpath. --> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-checkstyle</artifactId> + <version>${project.version}</version> + </dependency> + <!-- Newer versions of puppycrawl checkstyle have more features. Pull in a more recent + version than is specified in the maven-checkstyle-plugin pom file. --> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>${checkstyle.tool.version}</version> + </dependency> + </dependencies> + <executions> + <execution> + <id>verify</id> + <phase>verify</phase> + <configuration> + <configLocation>flume/checkstyle.xml</configLocation> + <suppressionsLocation>flume/checkstyle-suppressions.xml</suppressionsLocation> + <suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression> + <encoding>UTF-8</encoding> + <consoleOutput>true</consoleOutput> + <failsOnError>true</failsOnError> + <includeTestSourceDirectory>false</includeTestSourceDirectory> + <linkXRef>false</linkXRef> + </configuration> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> <pluginManagement> @@ -776,6 +822,7 @@ limitations under the License. <dependencyManagement> <dependencies> + <!-- Dependencies: build --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> @@ -1053,6 +1100,11 @@ limitations under the License. <dependency> <groupId>org.apache.flume</groupId> + <artifactId>flume-checkstyle</artifactId> + <version>1.7.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> <artifactId>flume-tools</artifactId> <version>1.7.0-SNAPSHOT</version> </dependency> @@ -1413,6 +1465,20 @@ limitations under the License. </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>${checkstyle.plugin.version}</version> + <configuration> + <configLocation>flume/checkstyle.xml</configLocation> + <suppressionsLocation>flume/checkstyle-suppressions.xml</suppressionsLocation> + <suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression> + <encoding>UTF-8</encoding> + <includeTestSourceDirectory>false</includeTestSourceDirectory> + <linkXRef>false</linkXRef> + </configuration> + </plugin> + </plugins> </reporting>
