Repository: flume
Updated Branches:
  refs/heads/trunk 2fe393898 -> 2252fb193


http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
index cb36e41..42474c4 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
@@ -19,21 +19,19 @@
 
 package org.apache.flume.source.taildir;
 
-import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*;
+import com.google.common.collect.Lists;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.BYTE_OFFSET_HEADER_KEY;
 
 public class TailFile {
   private static final Logger logger = LoggerFactory.getLogger(TailFile.class);
@@ -41,8 +39,8 @@ public class TailFile {
   private static final byte BYTE_NL = (byte) 10;
   private static final byte BYTE_CR = (byte) 13;
 
-  private final static int BUFFER_SIZE = 8192;
-  private final static int NEED_READING = -1;
+  private static final int BUFFER_SIZE = 8192;
+  private static final int NEED_READING = -1;
 
   private RandomAccessFile raf;
   private final String path;
@@ -61,7 +59,7 @@ public class TailFile {
     this.raf = new RandomAccessFile(file, "r");
     if (pos > 0) {
       raf.seek(pos);
-      lineReadPos=pos;
+      lineReadPos = pos;
     }
     this.path = file.getAbsolutePath();
     this.inode = inode;
@@ -70,22 +68,56 @@ public class TailFile {
     this.needTail = true;
     this.headers = headers;
     this.oldBuffer = new byte[0];
-    this.bufferPos= NEED_READING;
+    this.bufferPos = NEED_READING;
+  }
+
+  public RandomAccessFile getRaf() {
+    return raf;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public long getInode() {
+    return inode;
   }
 
-  public RandomAccessFile getRaf() { return raf; }
-  public String getPath() { return path; }
-  public long getInode() { return inode; }
-  public long getPos() { return pos; }
-  public long getLastUpdated() { return lastUpdated; }
-  public boolean needTail() { return needTail; }
-  public Map<String, String> getHeaders() { return headers; }
-  public long getLineReadPos() { return lineReadPos; }
+  public long getPos() {
+    return pos;
+  }
+
+  public long getLastUpdated() {
+    return lastUpdated;
+  }
+
+  public boolean needTail() {
+    return needTail;
+  }
+
+  public Map<String, String> getHeaders() {
+    return headers;
+  }
+
+  public long getLineReadPos() {
+    return lineReadPos;
+  }
 
-  public void setPos(long pos) { this.pos = pos; }
-  public void setLastUpdated(long lastUpdated) { this.lastUpdated = 
lastUpdated; }
-  public void setNeedTail(boolean needTail) { this.needTail = needTail; }
-  public void setLineReadPos(long lineReadPos) { this.lineReadPos = 
lineReadPos; }
+  public void setPos(long pos) {
+    this.pos = pos;
+  }
+
+  public void setLastUpdated(long lastUpdated) {
+    this.lastUpdated = lastUpdated;
+  }
+
+  public void setNeedTail(boolean needTail) {
+    this.needTail = needTail;
+  }
+
+  public void setLineReadPos(long lineReadPos) {
+    this.lineReadPos = lineReadPos;
+  }
 
   public boolean updatePos(String path, long inode, long pos) throws 
IOException {
     if (this.inode == inode && this.path.equals(path)) {
@@ -99,7 +131,7 @@ public class TailFile {
   public void updateFilePos(long pos) throws IOException {
     raf.seek(pos);
     lineReadPos = pos;
-    bufferPos= NEED_READING;
+    bufferPos = NEED_READING;
     oldBuffer = new byte[0];
   }
 
@@ -146,7 +178,8 @@ public class TailFile {
     bufferPos = 0;
   }
 
-  private byte[] concatByteArrays(byte[] a, int startIdxA, int lenA, byte[] b, 
int startIdxB, int lenB) {
+  private byte[] concatByteArrays(byte[] a, int startIdxA, int lenA,
+                                  byte[] b, int startIdxB, int lenB) {
     byte[] c = new byte[lenA + lenB];
     System.arraycopy(a, startIdxA, c, 0, lenA);
     System.arraycopy(b, startIdxB, c, lenA, lenB);
@@ -195,7 +228,8 @@ public class TailFile {
         break;
       }
       // NEW_LINE not showed up at the end of the buffer
-      oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length, buffer, 
bufferPos, (buffer.length - bufferPos));
+      oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length,
+                                   buffer, bufferPos, buffer.length - 
bufferPos);
       bufferPos = NEED_READING;
     }
     return lineResult;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java
index 245aef5..ad9f720 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.FileSystem;
@@ -40,25 +39,25 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
 
 /**
- * Identifies and caches the files matched by single file pattern for 
<code>TAILDIR<code/> source.
+ * Identifies and caches the files matched by single file pattern for {@code 
TAILDIR} source.
  * <p></p>
- * Since file patterns only apply to the fileNames and not the parent 
dictionaries, this implementation
- * checks the parent directory for modification (additional or removed files 
update modification time of parent dir)
- * If no modification happened to the parent dir that means the underlying 
files could only be written to but no need
- * to rerun the pattern matching on fileNames.
+ * Since file patterns only apply to the fileNames and not the parent 
dictionaries, this
+ * implementation checks the parent directory for modification (additional or 
removed files
+ * update modification time of parent dir)
+ * If no modification happened to the parent dir that means the underlying 
files could only be
+ * written to but no need to rerun the pattern matching on fileNames.
  * <p></p>
- * This implementation provides lazy caching or no caching. Instances of this 
class keep the result
- * file list from the last successful execution
- * of {@linkplain #getMatchingFiles()} function invocation,
- * and may serve the content without hitting the FileSystem for performance 
optimization.
+ * This implementation provides lazy caching or no caching. Instances of this 
class keep the
+ * result file list from the last successful execution of {@linkplain 
#getMatchingFiles()}
+ * function invocation, and may serve the content without hitting the 
FileSystem for performance
+ * optimization.
  * <p></p>
- * <b>IMPORTANT:</b> It is assumed that the hosting system provides at least 
second granularity for both
- * <code>System.currentTimeMillis()</code> and 
<code>File.lastModified()</code>. Also that system clock is used
- * for file system timestamps. If it is not the case then configure it as 
uncached.
- * Class is solely for package only usage. Member functions are not thread 
safe.
+ * <b>IMPORTANT:</b> It is assumed that the hosting system provides at least 
second granularity
+ * for both {@code System.currentTimeMillis()} and {@code 
File.lastModified()}. Also
+ * that system clock is used for file system timestamps. If it is not the case 
then configure it
+ * as uncached. Class is solely for package only usage. Member functions are 
not thread safe.
  *
  * @see TaildirSource
  * @see ReliableTaildirEventReader
@@ -84,29 +83,33 @@ public class TaildirMatcher {
 
   // system time in milliseconds, stores the last modification time of the
   // parent directory seen by the last check, rounded to seconds
-  // initial value is used in first check only when it will be replaced 
instantly (system time is positive)
+  // initial value is used in first check only when it will be replaced 
instantly
+  // (system time is positive)
   private long lastSeenParentDirMTime = -1;
   // system time in milliseconds, time of the last check, rounded to seconds
-  // initial value is used in first check only when it will be replaced 
instantly (system time is positive)
+  // initial value is used in first check only when it will be replaced 
instantly
+  // (system time is positive)
   private long lastCheckedTime = -1;
   // cached content, files which matched the pattern within the parent 
directory
   private List<File> lastMatchedFiles = Lists.newArrayList();
 
   /**
-   * Package accessible constructor. From configuration context it represents 
a single <code>filegroup</code>
-   * and encapsulates the corresponding <code>filePattern</code>.
-   * <code>filePattern</code> consists of two parts: first part has to be a 
valid path to
-   * an existing parent directory, second part has to be a
-   * valid regex {@link java.util.regex.Pattern} that match any non-hidden 
file names within parent directory.
-   * A valid example for filePattern is <code>/dir0/dir1/.*</code> given 
<code>/dir0/dir1</code>
-   * is an existing directory structure readable by the running user.
+   * Package accessible constructor. From configuration context it represents 
a single
+   * <code>filegroup</code> and encapsulates the corresponding 
<code>filePattern</code>.
+   * <code>filePattern</code> consists of two parts: first part has to be a 
valid path to an
+   * existing parent directory, second part has to be a valid regex
+   * {@link java.util.regex.Pattern} that match any non-hidden file names 
within parent directory
+   * . A valid example for filePattern is <code>/dir0/dir1/.*</code> given
+   * <code>/dir0/dir1</code> is an existing directory structure readable by 
the running user.
    * <p></p>
    * An instance of this class is created for each fileGroup
    *
    * @param fileGroup arbitrary name of the group given by the config
-   * @param filePattern parent directory plus regex pattern. No wildcards are 
allowed in directory name
-   * @param cachePatternMatching default true, recommended in every setup 
especially with huge parent directories.
-   *                         Don't set when local system clock is not used for 
stamping mtime (eg: remote filesystems)
+   * @param filePattern parent directory plus regex pattern. No wildcards are 
allowed in directory
+   *                    name
+   * @param cachePatternMatching default true, recommended in every setup 
especially with huge
+   *                             parent directories. Don't set when local 
system clock is not used
+   *                             for stamping mtime (eg: remote filesystems)
    * @see TaildirSourceConfigurationConstants
    */
   TaildirMatcher(String fileGroup, String filePattern, boolean 
cachePatternMatching) {
@@ -133,11 +136,12 @@ public class TaildirMatcher {
   }
 
   /**
-   * Lists those files within the parentDir that match regex pattern passed in 
during object instantiation.
-   * Designed for frequent periodic invocation {@link 
org.apache.flume.source.PollableSourceRunner}.
+   * Lists those files within the parentDir that match regex pattern passed in 
during object
+   * instantiation. Designed for frequent periodic invocation
+   * {@link org.apache.flume.source.PollableSourceRunner}.
    * <p></p>
-   * Based on the modification of the parentDir this function may trigger 
cache recalculation by calling
-   * {@linkplain #getMatchingFilesNoCache()} or
+   * Based on the modification of the parentDir this function may trigger 
cache recalculation by
+   * calling {@linkplain #getMatchingFilesNoCache()} or
    * return the value stored in {@linkplain #lastMatchedFiles}.
    * Parentdir is allowed to be a symbolic link.
    * <p></p>
@@ -146,7 +150,8 @@ public class TaildirMatcher {
    * so it may (or may not) reflect updates to the directory that occur during 
the call,
    * In which case next call
    * will return those files (as mtime is increasing it won't hit cache but 
trigger recalculation).
-   * It is guaranteed that invocation reflects every change which was 
observable at the time of invocation.
+   * It is guaranteed that invocation reflects every change which was 
observable at the time of
+   * invocation.
    * <p></p>
    * Matching file list recalculation is triggered when caching was turned off 
or
    * if mtime is greater than the previously seen mtime
@@ -156,8 +161,8 @@ public class TaildirMatcher {
    * within the same second so in such case (assuming at least second 
granularity of reported mtime)
    * it is impossible to tell whether a change of the dir happened before the 
check or after
    * (unless the check happened after that second).
-   * Having said that implementation also stores system time of the previous 
invocation and previous invocation has to
-   * happen strictly after the current mtime to avoid further cache refresh
+   * Having said that implementation also stores system time of the previous 
invocation and previous
+   * invocation has to happen strictly after the current mtime to avoid 
further cache refresh
    * (because then it is guaranteed that previous invocation resulted in valid 
cache content).
    * If system clock hasn't passed the second of
    * the current mtime then logic expects more changes as well
@@ -166,23 +171,26 @@ public class TaildirMatcher {
    * hence it recalculates matching files. If system clock finally
    * passed actual mtime then a subsequent invocation guarantees that it 
picked up every
    * change from the passed second so
-   * any further invocations can be served from cache associated with that 
second (given mtime is not updated again).
+   * any further invocations can be served from cache associated with that 
second
+   * (given mtime is not updated again).
    *
-   * @return List of files matching the pattern sorted by last modification 
time. No recursion. No directories.
-   * If nothing matches then returns an empty list. If I/O issue occurred then 
returns the list collected to the point
-   * when exception was thrown.
+   * @return List of files matching the pattern sorted by last modification 
time. No recursion.
+   * No directories. If nothing matches then returns an empty list. If I/O 
issue occurred then
+   * returns the list collected to the point when exception was thrown.
    *
    * @see #getMatchingFilesNoCache()
    */
   List<File> getMatchingFiles() {
-    long now = 
TimeUnit.SECONDS.toMillis(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
+    long now = TimeUnit.SECONDS.toMillis(
+        TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
     long currentParentDirMTime = parentDir.lastModified();
     List<File> result;
 
     // calculate matched files if
     // - we don't want to use cache (recalculate every time) OR
     // - directory was clearly updated after the last check OR
-    // - last mtime change wasn't already checked for sure (system clock 
hasn't passed that second yet)
+    // - last mtime change wasn't already checked for sure
+    //   (system clock hasn't passed that second yet)
     if (!cachePatternMatching ||
         lastSeenParentDirMTime < currentParentDirMTime ||
         !(currentParentDirMTime < lastCheckedTime)) {
@@ -199,13 +207,13 @@ public class TaildirMatcher {
    * files are matching the regex pattern. Each invocation uses {@link 
DirectoryStream}
    * to identify matching files.
    *
-   * Files returned by this call are weakly consistent (see {@link 
DirectoryStream}). It does not freeze the directory while iterating,
-   * so it may (or may not) reflect updates to the directory that occur during 
the call. In which case next call
-   * will return those files.
+   * Files returned by this call are weakly consistent (see {@link 
DirectoryStream}).
+   * It does not freeze the directory while iterating, so it may (or may not) 
reflect updates
+   * to the directory that occur during the call. In which case next call will 
return those files.
    *
    * @return List of files matching the pattern unsorted. No recursion. No 
directories.
-   * If nothing matches then returns an empty list. If I/O issue occurred then 
returns the list collected to the point
-   * when exception was thrown.
+   * If nothing matches then returns an empty list. If I/O issue occurred then 
returns the list
+   * collected to the point when exception was thrown.
    *
    * @see DirectoryStream
    * @see DirectoryStream.Filter
@@ -217,16 +225,16 @@ public class TaildirMatcher {
         result.add(entry.toFile());
       }
     } catch (IOException e) {
-      logger.error("I/O exception occurred while listing parent directory. 
Files already matched will be returned. " +
-          parentDir.toPath(), e);
+      logger.error("I/O exception occurred while listing parent directory. " +
+                   "Files already matched will be returned. " + 
parentDir.toPath(), e);
     }
     return result;
   }
 
   /**
    * Utility function to sort matched files based on last modification time.
-   * Sorting itself use only a snapshot of last modification times captured 
before the sorting to keep the
-   * number of stat system calls to the required minimum.
+   * Sorting itself use only a snapshot of last modification times captured 
before the sorting
+   * to keep the number of stat system calls to the required minimum.
    *
    * @param files list of files in any order
    * @return sorted list

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
index dfb5b29..eae1b1a 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
@@ -150,7 +150,8 @@ public class TaildirSource extends AbstractSource implements
     String fileGroups = context.getString(FILE_GROUPS);
     Preconditions.checkState(fileGroups != null, "Missing param: " + 
FILE_GROUPS);
 
-    filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX), 
fileGroups.split("\\s+"));
+    filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX),
+                             fileGroups.split("\\s+"));
     Preconditions.checkState(!filePaths.isEmpty(),
         "Mapping for tailing files is empty or invalid: '" + 
FILE_GROUPS_PREFIX + "'");
 
@@ -168,12 +169,13 @@ public class TaildirSource extends AbstractSource 
implements
     byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, 
DEFAULT_BYTE_OFFSET_HEADER);
     idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);
     writePosInterval = context.getInteger(WRITE_POS_INTERVAL, 
DEFAULT_WRITE_POS_INTERVAL);
-    cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING, 
DEFAULT_CACHE_PATTERN_MATCHING);
+    cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING,
+        DEFAULT_CACHE_PATTERN_MATCHING);
 
-    backoffSleepIncrement = 
context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT
-            , PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
-    maxBackOffSleepInterval = 
context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP
-            , PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
+    backoffSleepIncrement = 
context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
+        PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
+    maxBackOffSleepInterval = 
context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
+        PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
 
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
@@ -254,7 +256,7 @@ public class TaildirSource extends AbstractSource implements
         reader.commit();
       } catch (ChannelException ex) {
         logger.warn("The channel is full or unexpected failure. " +
-          "The source will try again after " + retryInterval + " ms");
+            "The source will try again after " + retryInterval + " ms");
         TimeUnit.MILLISECONDS.sleep(retryInterval);
         retryInterval = retryInterval << 1;
         retryInterval = Math.min(retryInterval, maxRetryInterval);
@@ -320,7 +322,7 @@ public class TaildirSource extends AbstractSource implements
         String json = toPosInfoJson();
         writer.write(json);
       }
-    } catch (Throwable t){
+    } catch (Throwable t) {
       logger.error("Failed writing positionFile", t);
     } finally {
       try {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
index b0c934d..2c49540 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
@@ -50,7 +50,9 @@ public class TaildirSourceConfigurationConstants {
   public static final String BYTE_OFFSET_HEADER_KEY = "byteoffset";
   public static final boolean DEFAULT_BYTE_OFFSET_HEADER = false;
 
-  /** Whether to cache the list of files matching the specified file patterns 
till parent directory is modified. */
+  /** Whether to cache the list of files matching the specified file patterns 
till parent directory
+   * is modified.
+   */
   public static final String CACHE_PATTERN_MATCHING = "cachePatternMatching";
   public static final boolean DEFAULT_CACHE_PATTERN_MATCHING = true;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java
 
b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java
index f488bae..efb6457 100644
--- 
a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java
+++ 
b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java
@@ -29,39 +29,39 @@ import org.junit.Test;
 
 public class TestRpcClientCommunicationFailure {
 
-   public static final String CONFIG_FILE_PRCCLIENT_TEST =
-        "rpc-client-test.properties";
+  public static final String CONFIG_FILE_PRCCLIENT_TEST =
+      "rpc-client-test.properties";
 
-   @Test
-   public void testFailure() throws Exception {
-     try {
+  @Test
+  public void testFailure() throws Exception {
+    try {
 
-       StagedInstall.getInstance().startAgent(
-         "rpccagent", CONFIG_FILE_PRCCLIENT_TEST);
-       StagedInstall.waitUntilPortOpens("localhost", 12121, 20000);
-       RpcClient client = RpcClientFactory.getDefaultInstance(
-           "localhost", 12121);
-       String[] text = {"foo", "bar", "xyz", "abc"};
-       for (String str : text) {
-         client.append(EventBuilder.withBody(str.getBytes()));
-       }
+      StagedInstall.getInstance().startAgent(
+          "rpccagent", CONFIG_FILE_PRCCLIENT_TEST);
+      StagedInstall.waitUntilPortOpens("localhost", 12121, 20000);
+      RpcClient client = RpcClientFactory.getDefaultInstance(
+          "localhost", 12121);
+      String[] text = {"foo", "bar", "xyz", "abc"};
+      for (String str : text) {
+        client.append(EventBuilder.withBody(str.getBytes()));
+      }
 
-       // Stop the agent
-       StagedInstall.getInstance().stopAgent();
+      // Stop the agent
+      StagedInstall.getInstance().stopAgent();
 
-       // Try sending the event which should fail
-       try {
-         client.append(EventBuilder.withBody("test".getBytes()));
-         Assert.fail("EventDeliveryException expected but not raised");
-       } catch (EventDeliveryException ex) {
-         System.out.println("Attempting to close client");
-         client.close();
-       }
-     } finally {
-       if (StagedInstall.getInstance().isRunning()) {
-         StagedInstall.getInstance().stopAgent();
-       }
-     }
-   }
+      // Try sending the event which should fail
+      try {
+        client.append(EventBuilder.withBody("test".getBytes()));
+        Assert.fail("EventDeliveryException expected but not raised");
+      } catch (EventDeliveryException ex) {
+        System.out.println("Attempting to close client");
+        client.close();
+      }
+    } finally {
+      if (StagedInstall.getInstance().isRunning()) {
+        StagedInstall.getInstance().stopAgent();
+      }
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git 
a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
 
b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
index 7abb7eb..f892d89 100644
--- 
a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
+++ 
b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
@@ -49,8 +49,7 @@ import java.util.Properties;
 import java.util.Set;
 
 public class FileChannelIntegrityTool implements FlumeTool {
-  public static final Logger LOG = LoggerFactory.getLogger
-    (FileChannelIntegrityTool.class);
+  public static final Logger LOG = 
LoggerFactory.getLogger(FileChannelIntegrityTool.class);
 
   private final List<File> dataDirs = new ArrayList<File>();
 
@@ -66,18 +65,18 @@ public class FileChannelIntegrityTool implements FlumeTool {
   @Override
   public void run(String[] args) throws IOException, ParseException {
     boolean shouldContinue = parseCommandLineOpts(args);
-    if(!shouldContinue) {
+    if (!shouldContinue) {
       LOG.error("Could not parse command line options. Exiting ...");
       System.exit(1);
     }
-    for(File dataDir : dataDirs) {
+    for (File dataDir : dataDirs) {
       File[] dataFiles = dataDir.listFiles(new FilenameFilter() {
         @Override
         public boolean accept(File dir, String name) {
-          if(!name.endsWith(Serialization.METADATA_FILENAME)
-            && !name.endsWith(Serialization.METADATA_TMP_FILENAME)
-            && !name.endsWith(Serialization.OLD_METADATA_FILENAME)
-            && !name.equals(Log.FILE_LOCK)) {
+          if (!name.endsWith(Serialization.METADATA_FILENAME)
+              && !name.endsWith(Serialization.METADATA_TMP_FILENAME)
+              && !name.endsWith(Serialization.OLD_METADATA_FILENAME)
+              && !name.equals(Log.FILE_LOCK)) {
             return true;
           }
           return false;
@@ -86,10 +85,8 @@ public class FileChannelIntegrityTool implements FlumeTool {
       if (dataFiles != null && dataFiles.length > 0) {
         for (File dataFile : dataFiles) {
           LOG.info("Checking for corruption in " + dataFile.toString());
-          LogFile.SequentialReader reader =
-            new LogFileV3.SequentialReader(dataFile, null, true);
-          LogFile.OperationRecordUpdater updater = new LogFile
-            .OperationRecordUpdater(dataFile);
+          LogFile.SequentialReader reader = new 
LogFileV3.SequentialReader(dataFile, null, true);
+          LogFile.OperationRecordUpdater updater = new 
LogFile.OperationRecordUpdater(dataFile);
           boolean fileDone = false;
           boolean fileBackedup = false;
           while (!fileDone) {
@@ -106,7 +103,7 @@ public class FileChannelIntegrityTool implements FlumeTool {
               if (record != null) {
                 TransactionEventRecord recordEvent = record.getEvent();
                 Event event = 
EventUtils.getEventFromTransactionEvent(recordEvent);
-                if(event != null) {
+                if (event != null) {
                   totalPutEvents++;
                   try {
                     if (!eventValidator.validateEvent(event)) {
@@ -124,7 +121,8 @@ public class FileChannelIntegrityTool implements FlumeTool {
                     // OOPS, didn't expected an exception
                     // considering as failure case
                     // marking as noop
-                    System.err.println("Encountered Exception while validating 
event, marking as invalid");
+                    System.err.println("Encountered Exception while validating 
event, " +
+                                       "marking as invalid");
                     updater.markRecordAsNoop(eventPosition);
                     eventsWithException++;
                   }
@@ -135,11 +133,10 @@ public class FileChannelIntegrityTool implements 
FlumeTool {
             } catch (CorruptEventException e) {
               corruptEvents++;
               totalChannelEvents++;
-              LOG.warn("Corruption found in " + dataFile.toString() + " at "
-                + eventPosition);
+              LOG.warn("Corruption found in " + dataFile.toString() + " at " + 
eventPosition);
               if (!fileBackedup) {
                 Serialization.copyFile(dataFile, new File(dataFile.getParent(),
-                  dataFile.getName() + ".bak"));
+                                                          dataFile.getName() + 
".bak"));
                 fileBackedup = true;
               }
               updater.markRecordAsNoop(eventPosition);
@@ -155,49 +152,46 @@ public class FileChannelIntegrityTool implements 
FlumeTool {
 
   private boolean parseCommandLineOpts(String[] args) throws ParseException {
     Options options = new Options();
-    options
-      .addOption("l", "dataDirs", true, "Comma-separated list of data " +
-        "directories which the tool must verify. This option is mandatory")
-      .addOption("h", "help", false, "Display help")
-            .addOption("e", "eventValidator", true, "Fully Qualified Name of 
Event Validator Implementation");;
+    options.addOption("l", "dataDirs", true, "Comma-separated list of data " +
+                      "directories which the tool must verify. This option is 
mandatory")
+           .addOption("h", "help", false, "Display help")
+           .addOption("e", "eventValidator", true,
+                      "Fully Qualified Name of Event Validator 
Implementation");
 
-
-    Option property  = OptionBuilder.withArgName("property=value")
+    Option property = OptionBuilder.withArgName("property=value")
             .hasArgs(2)
             .withValueSeparator()
-            .withDescription( "custom properties" )
-            .create( "D" );
+            .withDescription("custom properties")
+            .create("D");
 
     options.addOption(property);
 
     CommandLineParser parser = new GnuParser();
     CommandLine commandLine = parser.parse(options, args);
-    if(commandLine.hasOption("help")) {
-      new HelpFormatter().printHelp("bin/flume-ng tool fcintegritytool ",
-        options, true);
+    if (commandLine.hasOption("help")) {
+      new HelpFormatter().printHelp("bin/flume-ng tool fcintegritytool ", 
options, true);
       return false;
     }
-    if(!commandLine.hasOption("dataDirs")) {
+    if (!commandLine.hasOption("dataDirs")) {
       new HelpFormatter().printHelp("bin/flume-ng tool fcintegritytool ", "",
-        options, "dataDirs is required.", true);
+          options, "dataDirs is required.", true);
       return false;
     } else {
-      String dataDirStr[] = commandLine.getOptionValue("dataDirs").split(",");
-      for(String dataDir : dataDirStr) {
+      String[] dataDirStr = commandLine.getOptionValue("dataDirs").split(",");
+      for (String dataDir : dataDirStr) {
         File f = new File(dataDir);
-        if(!f.exists()) {
-          throw new FlumeException("Data directory, " + dataDir + " does not " 
+
-            "exist.");
+        if (!f.exists()) {
+          throw new FlumeException("Data directory, " + dataDir + " does not 
exist.");
         }
         dataDirs.add(f);
       }
     }
 
-    if(commandLine.hasOption("eventValidator")) {
+    if (commandLine.hasOption("eventValidator")) {
       try {
         Class<? extends EventValidator.Builder> eventValidatorClassName =
-                (Class<? extends EventValidator.Builder>)Class.forName(
-                  commandLine.getOptionValue("eventValidator"));
+            (Class<? extends EventValidator.Builder>)Class.forName(
+                commandLine.getOptionValue("eventValidator"));
         EventValidator.Builder eventValidatorBuilder = 
eventValidatorClassName.newInstance();
 
         // Pass on the configuration parameter
@@ -226,12 +220,13 @@ public class FileChannelIntegrityTool implements 
FlumeTool {
    */
   private void printSummary() {
     System.out.println("---------- Summary --------------------");
-    System.out.println("Number of Events in the Channel = 
"+totalChannelEvents++);
-    System.out.println("Number of Put Events Processed = "+totalPutEvents);
-    System.out.println("Number of Valid Put Events = "+validEvents);
-    System.out.println("Number of Invalid Put Events = "+invalidEvents);
-    System.out.println("Number of Put Events that threw Exception during 
validation = "+eventsWithException);
-    System.out.println("Number of Corrupt Events = "+corruptEvents);
+    System.out.println("Number of Events in the Channel = " + 
totalChannelEvents++);
+    System.out.println("Number of Put Events Processed = " + totalPutEvents);
+    System.out.println("Number of Valid Put Events = " + validEvents);
+    System.out.println("Number of Invalid Put Events = " + invalidEvents);
+    System.out.println("Number of Put Events that threw Exception during 
validation = "
+        + eventsWithException);
+    System.out.println("Number of Corrupt Events = " + corruptEvents);
     System.out.println("---------------------------------------");
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java
----------------------------------------------------------------------
diff --git 
a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java 
b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java
index 1bb122d..e9ba27d 100644
--- a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java
+++ b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java
@@ -34,7 +34,7 @@ public enum FlumeToolType {
 
   public static String getNames() {
     StringBuilder builder = new StringBuilder();
-    for(FlumeToolType type: values()) {
+    for (FlumeToolType type: values()) {
       builder.append(type.name().toLowerCase(Locale.ENGLISH) + "\n");
     }
     return builder.toString();

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java
----------------------------------------------------------------------
diff --git 
a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java 
b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java
index 799ce85..426f55f 100644
--- a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java
+++ b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java
@@ -36,33 +36,32 @@ public class FlumeToolsMain implements FlumeTool {
   }
 
   @Override
-  public void run(String[] args) throws Exception{
+  public void run(String[] args) throws Exception {
     String error = "Expected name of tool and arguments for" +
-      " tool to be passed in on the command line. Please pass one of the " +
-      "following as arguments to this command: \n";
+        " tool to be passed in on the command line. Please pass one of the " +
+        "following as arguments to this command: \n";
     StringBuilder builder = new StringBuilder(error);
-    for(FlumeToolType type : FlumeToolType.values()) {
+    for (FlumeToolType type : FlumeToolType.values()) {
       builder.append(type.name()).append("\n");
     }
-    if(args == null || args.length == 0) {
+    if (args == null || args.length == 0) {
       System.out.println(builder.toString());
       System.exit(1);
     }
     String toolName = args[0];
     FlumeTool tool = null;
-    for(FlumeToolType type : FlumeToolType.values()) {
-      if(toolName.equalsIgnoreCase(type.name())) {
+    for (FlumeToolType type : FlumeToolType.values()) {
+      if (toolName.equalsIgnoreCase(type.name())) {
         tool = type.getClassInstance().newInstance();
         break;
       }
     }
     Preconditions.checkNotNull(tool, "Cannot find tool matching " + toolName
-      + ". Please select one of: \n " + FlumeToolType.getNames());
+        + ". Please select one of: \n " + FlumeToolType.getNames());
     if (args.length == 1) {
       tool.run(new String[0]);
     } else {
-      tool.run(Arrays.asList(args).subList(1, args.length).
-        toArray(new String[0]));
+      tool.run(Arrays.asList(args).subList(1, args.length).toArray(new 
String[0]));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9954353..85c0dc8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,8 @@ limitations under the License.
     <stagingDirectory>${project.basedir}/target/docs</stagingDirectory>
 
     <avro.version>1.7.4</avro.version>
+    <checkstyle.plugin.version>2.17</checkstyle.plugin.version>
+    <checkstyle.tool.version>6.19</checkstyle.tool.version>
     <elasticsearch.version>0.90.1</elasticsearch.version>
     <hadoop2.version>2.4.0</hadoop2.version>
     <thrift.version>0.7.0</thrift.version>
@@ -58,6 +60,7 @@ limitations under the License.
   </properties>
 
   <modules>
+    <module>flume-checkstyle</module>
     <module>flume-ng-core</module>
     <module>flume-ng-configuration</module>
     <module>flume-ng-embedded-agent</module>
@@ -638,6 +641,49 @@ limitations under the License.
         <artifactId>apache-rat-plugin</artifactId>
       </plugin>
 
+      <!-- Run checkstyle as part of the "verify" phase. See
+           
https://maven.apache.org/plugins/maven-checkstyle-plugin/examples/multi-module-config.html
+           for how to configure plugin for a multi-module project. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>${checkstyle.plugin.version}</version>
+        <dependencies>
+          <!-- The flume-checkstyle module adds the checkstyle config files to 
the classpath. -->
+          <dependency>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>flume-checkstyle</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+          <!-- Newer versions of puppycrawl checkstyle have more features. 
Pull in a more recent
+               version than is specified in the maven-checkstyle-plugin pom 
file. -->
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>${checkstyle.tool.version}</version>
+          </dependency>
+        </dependencies>
+        <executions>
+          <execution>
+            <id>verify</id>
+            <phase>verify</phase>
+            <configuration>
+              <configLocation>flume/checkstyle.xml</configLocation>
+              
<suppressionsLocation>flume/checkstyle-suppressions.xml</suppressionsLocation>
+              
<suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression>
+              <encoding>UTF-8</encoding>
+              <consoleOutput>true</consoleOutput>
+              <failsOnError>true</failsOnError>
+              <includeTestSourceDirectory>false</includeTestSourceDirectory>
+              <linkXRef>false</linkXRef>
+            </configuration>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
     </plugins>
 
     <pluginManagement>
@@ -776,6 +822,7 @@ limitations under the License.
   <dependencyManagement>
     <dependencies>
 
+      <!-- Dependencies: build -->
       <dependency>
         <groupId>junit</groupId>
         <artifactId>junit</artifactId>
@@ -1053,6 +1100,11 @@ limitations under the License.
 
       <dependency>
         <groupId>org.apache.flume</groupId>
+        <artifactId>flume-checkstyle</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
         <artifactId>flume-tools</artifactId>
         <version>1.7.0-SNAPSHOT</version>
       </dependency>
@@ -1413,6 +1465,20 @@ limitations under the License.
         </configuration>
       </plugin>
 
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>${checkstyle.plugin.version}</version>
+        <configuration>
+          <configLocation>flume/checkstyle.xml</configLocation>
+          
<suppressionsLocation>flume/checkstyle-suppressions.xml</suppressionsLocation>
+          
<suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression>
+          <encoding>UTF-8</encoding>
+          <includeTestSourceDirectory>false</includeTestSourceDirectory>
+          <linkXRef>false</linkXRef>
+        </configuration>
+      </plugin>
+
     </plugins>
   </reporting>
 

Reply via email to