Repository: flume
Updated Branches:
  refs/heads/trunk bb5e374df -> dad828acb


FLUME-3246 Validate flume configuration to prevent larger source batchsize than
the channel transaction capacity

The loadSources() method seemed like an appropriate place to check this.
Added 2 new interfaces for getting the transaction capacity and the batch size
fields. The check is only done for channels that implement the
TransactioCapacitySupported interface and sources and sinks that implement
the BatchSizeSupported interface.

This closes #212

Reviewers: Ferenc Szabo, Peter Turcsanyi

(Endre Major via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dad828ac
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dad828ac
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dad828ac

Branch: refs/heads/trunk
Commit: dad828acbe40b218dc9c5bc741f5a6c7fa8b338f
Parents: bb5e374
Author: Endre Major <[email protected]>
Authored: Tue Jun 19 14:54:50 2018 +0200
Committer: Ferenc Szabo <[email protected]>
Committed: Wed Aug 29 07:57:21 2018 +0200

----------------------------------------------------------------------
 .../apache/flume/channel/file/FileChannel.java  |   8 +-
 .../org/apache/flume/channel/MemoryChannel.java |   7 +-
 .../apache/flume/conf/BatchSizeSupported.java   |  32 +++
 .../conf/TransactionCapacitySupported.java      |  32 +++
 .../org/apache/flume/sink/AbstractRpcSink.java  |  17 +-
 .../java/org/apache/flume/sink/NullSink.java    |   8 +-
 .../org/apache/flume/sink/RollingFileSink.java  |   7 +-
 .../org/apache/flume/source/ExecSource.java     |   9 +-
 .../flume/source/MultiportSyslogTCPSource.java  |   8 +-
 .../flume/source/SequenceGeneratorSource.java   |   7 +-
 .../flume/source/SpoolDirectorySource.java      |   8 +-
 .../org/apache/flume/source/StressSource.java   |   9 +-
 .../apache/flume/sink/TestRollingFileSink.java  |  42 ++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   2 +-
 .../node/AbstractConfigurationProvider.java     |  71 +++++--
 .../node/TestAbstractConfigurationProvider.java |  50 +++++
 .../org/apache/flume/api/AbstractRpcClient.java |  30 +++
 .../org/apache/flume/api/FailoverRpcClient.java |  17 +-
 .../flume/api/LoadBalancingRpcClient.java       |   1 +
 .../apache/flume/api/NettyAvroRpcClient.java    |  19 +-
 .../org/apache/flume/api/ThriftRpcClient.java   |  11 +-
 .../org/apache/flume/sink/kite/DatasetSink.java |   8 +-
 .../apache/flume/sink/hdfs/HDFSEventSink.java   |   9 +-
 .../org/apache/flume/sink/hive/HiveSink.java    |   7 +-
 .../sink/elasticsearch/ElasticSearchSink.java   |   8 +-
 .../apache/flume/sink/hbase/AsyncHBaseSink.java |   8 +-
 .../org/apache/flume/sink/hbase/HBaseSink.java  |   8 +-
 .../apache/flume/sink/hbase2/HBase2Sink.java    |   8 +-
 .../org/apache/flume/sink/kafka/KafkaSink.java  |   5 +-
 .../sink/solr/morphline/MorphlineSink.java      |  10 +-
 .../org/apache/flume/source/jms/JMSSource.java  |   8 +-
 .../apache/flume/source/kafka/KafkaSource.java  |   8 +-
 .../flume/source/taildir/TaildirSource.java     |   8 +-
 .../flume/source/twitter/TwitterSource.java     |   8 +-
 .../org/apache/flume/test/agent/TestConfig.java | 209 +++++++++++++++++++
 .../flume/test/agent/TestConfigFilters.java     | 176 ----------------
 36 files changed, 623 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 3194592..fe7e80c 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -40,6 +40,7 @@ import 
org.apache.flume.channel.file.encryption.EncryptionConfiguration;
 import org.apache.flume.channel.file.encryption.KeyProvider;
 import org.apache.flume.channel.file.encryption.KeyProviderFactory;
 import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
+import org.apache.flume.conf.TransactionCapacitySupported;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +74,7 @@ import java.util.concurrent.TimeUnit;
 @InterfaceAudience.Private
 @InterfaceStability.Stable
 @Disposable
-public class FileChannel extends BasicChannelSemantics {
+public class FileChannel extends BasicChannelSemantics implements 
TransactionCapacitySupported {
 
   private static final Logger LOG = LoggerFactory.getLogger(FileChannel.class);
 
@@ -444,6 +445,11 @@ public class FileChannel extends BasicChannelSemantics {
     return channelCounter;
   }
 
+  @Override
+  public long getTransactionCapacity() {
+    return transactionCapacity;
+  }
+
   /**
    * Transaction backed by a file. This transaction supports either puts
    * or takes but not both.

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java 
b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
index add40e9..20da572 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
@@ -27,6 +27,7 @@ import org.apache.flume.Event;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.annotations.Recyclable;
+import org.apache.flume.conf.TransactionCapacitySupported;
 import org.apache.flume.instrumentation.ChannelCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ import java.util.concurrent.TimeUnit;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 @Recyclable
-public class MemoryChannel extends BasicChannelSemantics {
+public class MemoryChannel extends BasicChannelSemantics implements 
TransactionCapacitySupported {
   private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
   private static final Integer defaultCapacity = 100;
   private static final Integer defaultTransCapacity = 100;
@@ -379,4 +380,8 @@ public class MemoryChannel extends BasicChannelSemantics {
   int getBytesRemainingValue() {
     return bytesRemaining.availablePermits();
   }
+
+  public long getTransactionCapacity() {
+    return transCapacity;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/conf/BatchSizeSupported.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/conf/BatchSizeSupported.java 
b/flume-ng-core/src/main/java/org/apache/flume/conf/BatchSizeSupported.java
new file mode 100644
index 0000000..3aa477e
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/conf/BatchSizeSupported.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.conf;
+
+/**
+ * This interface indicates that a component does batching and the batch size
+ * is publicly available.
+ *
+ */
+public interface BatchSizeSupported {
+
+  /**
+   * Returns the batch size
+   */
+  long getBatchSize();
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/conf/TransactionCapacitySupported.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/conf/TransactionCapacitySupported.java
 
b/flume-ng-core/src/main/java/org/apache/flume/conf/TransactionCapacitySupported.java
new file mode 100644
index 0000000..bb543ac
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/conf/TransactionCapacitySupported.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.conf;
+
+/**
+ * This interface indicates that a component has a transaction capacity
+ * and it is publicly available.
+ *
+ */
+public interface TransactionCapacitySupported {
+
+  /**
+   * Returns the transaction capacity
+   */
+  long getTransactionCapacity();
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java 
b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
index 5a5993a..da347e0 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -30,8 +30,10 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
+import org.apache.flume.api.AbstractRpcClient;
 import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientConfigurationConstants;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.slf4j.Logger;
@@ -141,7 +143,8 @@ import java.util.concurrent.locks.ReentrantLock;
  * This method will be called whenever this sink needs to create a new
  * connection to the source.
  */
-public abstract class AbstractRpcSink extends AbstractSink implements 
Configurable {
+public abstract class AbstractRpcSink extends AbstractSink implements 
Configurable,
+    BatchSizeSupported {
 
   private static final Logger logger = 
LoggerFactory.getLogger(AbstractRpcSink.class);
   private String hostname;
@@ -156,6 +159,11 @@ public abstract class AbstractRpcSink extends AbstractSink 
implements Configurab
       Executors.newSingleThreadScheduledExecutor(
           new ThreadFactoryBuilder().setNameFormat("Rpc Sink Reset 
Thread").build());
 
+  // batchSize is used in the clients, here it is only used for config 
validation
+  // before the client is configured
+  private int batchSize;
+
+
   @Override
   public void configure(Context context) {
     clientProps = new Properties();
@@ -174,6 +182,8 @@ public abstract class AbstractRpcSink extends AbstractSink 
implements Configurab
       clientProps.setProperty(entry.getKey(), entry.getValue());
     }
 
+    batchSize = AbstractRpcClient.parseBatchSize(clientProps);
+
     if (sinkCounter == null) {
       sinkCounter = new SinkCounter(getName());
     }
@@ -401,4 +411,9 @@ public abstract class AbstractRpcSink extends AbstractSink 
implements Configurab
   RpcClient getUnderlyingClient() {
     return client;
   }
+
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java 
b/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
index eb00e15..9347f8e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
@@ -25,6 +25,7 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ import org.slf4j.LoggerFactory;
  * TODO
  * </p>
  */
-public class NullSink extends AbstractSink implements Configurable {
+public class NullSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
 
   private static final Logger logger = LoggerFactory.getLogger(NullSink.class);
 
@@ -137,4 +138,9 @@ public class NullSink extends AbstractSink implements 
Configurable {
     return "NullSink " + getName() + " { batchSize: " + batchSize + " }";
   }
 
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java 
b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
index 9b0827a..d48a3c8 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
@@ -31,6 +31,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.PathManager;
 import org.apache.flume.formatter.output.PathManagerFactory;
@@ -43,7 +44,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flume.serialization.EventSerializer;
 import org.apache.flume.serialization.EventSerializerFactory;
 
-public class RollingFileSink extends AbstractSink implements Configurable {
+public class RollingFileSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
 
   private static final Logger logger = LoggerFactory
       .getLogger(RollingFileSink.class);
@@ -284,4 +285,8 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
     this.rollInterval = rollInterval;
   }
 
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
index eaafbd6..2bb4077 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
@@ -38,6 +38,7 @@ import org.apache.flume.EventDrivenSource;
 import org.apache.flume.Source;
 import org.apache.flume.SystemClock;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SourceCounter;
@@ -146,7 +147,8 @@ import java.nio.charset.Charset;
  * TODO
  * </p>
  */
-public class ExecSource extends AbstractSource implements EventDrivenSource, 
Configurable {
+public class ExecSource extends AbstractSource implements EventDrivenSource, 
Configurable,
+        BatchSizeSupported {
 
   private static final Logger logger = 
LoggerFactory.getLogger(ExecSource.class);
 
@@ -248,6 +250,11 @@ public class ExecSource extends AbstractSource implements 
EventDrivenSource, Con
     }
   }
 
+  @Override
+  public long getBatchSize() {
+    return bufferCount;
+  }
+
   private static class ExecRunnable implements Runnable {
 
     public ExecRunnable(String shell, String command, ChannelProcessor 
channelProcessor,

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
index 3c59b47..82cb2f2 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
@@ -34,6 +34,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.event.EventBuilder;
@@ -50,7 +51,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class MultiportSyslogTCPSource extends AbstractSource implements
-        EventDrivenSource, Configurable {
+        EventDrivenSource, Configurable, BatchSizeSupported {
 
   public static final Logger logger = LoggerFactory.getLogger(
           MultiportSyslogTCPSource.class);
@@ -208,6 +209,11 @@ public class MultiportSyslogTCPSource extends 
AbstractSource implements
     return "Multiport Syslog TCP source " + getName();
   }
 
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
+
   static class MultiportSyslogHandler extends IoHandlerAdapter {
 
     private static final String SAVED_BUF = "savedBuffer";

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
index e494bfb..6719606 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
@@ -25,6 +25,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SourceCounter;
@@ -35,7 +36,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class SequenceGeneratorSource extends AbstractPollableSource implements
-        Configurable {
+        Configurable, BatchSizeSupported {
 
   private static final Logger logger = LoggerFactory
       .getLogger(SequenceGeneratorSource.class);
@@ -115,4 +116,8 @@ public class SequenceGeneratorSource extends 
AbstractPollableSource implements
     logger.info("Sequence generator source do stopped. Metrics:{}",getName(), 
sourceCounter);
   }
 
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 305ca3b..db4e0a3 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -27,6 +27,7 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.FlumeException;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.serialization.DecodeErrorPolicy;
@@ -45,7 +46,7 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.*;
 
 public class SpoolDirectorySource extends AbstractSource
-    implements Configurable, EventDrivenSource {
+    implements Configurable, EventDrivenSource, BatchSizeSupported {
 
   private static final Logger logger = 
LoggerFactory.getLogger(SpoolDirectorySource.class);
 
@@ -236,6 +237,11 @@ public class SpoolDirectorySource extends AbstractSource
     return recursiveDirectorySearch;
   }
 
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
+
   @VisibleForTesting
   protected class SpoolDirectoryRunnable implements Runnable {
     private ReliableSpoolingFileEventReader reader;

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
index f37174a..6c3c330 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
@@ -30,6 +30,7 @@ import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
 import org.slf4j.Logger;
@@ -54,7 +55,8 @@ import org.slf4j.LoggerFactory;
  *
  * See {@link StressSource#configure(Context)} for configuration options.
  */
-public class StressSource extends AbstractPollableSource implements 
Configurable {
+public class StressSource extends AbstractPollableSource
+        implements Configurable, BatchSizeSupported {
 
   private static final Logger logger = 
LoggerFactory.getLogger(StressSource.class);
 
@@ -171,4 +173,9 @@ public class StressSource extends AbstractPollableSource 
implements Configurable
   protected void doStop() throws FlumeException {
     logger.info("Stress source do stop. Metrics:{}", counterGroup);
   }
+
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
index 6b74e2d..c88b541 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
@@ -218,4 +218,46 @@ public class TestRollingFileSink {
       file.delete();
     }
   }
+
+  /**
+   * This test is to reproduce batch size and
+   * transaction capacity related configuration
+   * problems
+   */
+  @Test(expected = EventDeliveryException.class)
+  public void testTransCapBatchSizeCompatibility() throws 
EventDeliveryException {
+
+    Context context = new Context();
+
+    context.put("sink.directory", tmpDir.getPath());
+    context.put("sink.rollInterval", "0");
+    context.put("sink.batchSize", "1000");
+
+    Configurables.configure(sink, context);
+
+    context.put("capacity", "50");
+    context.put("transactionCapacity", "5");
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    try {
+      for (int j = 0; j < 10; j++) {
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        for (int i = 0; i < 5; i++) {
+          Event event = new SimpleEvent();
+          event.setBody(("Test event " + i).getBytes());
+          channel.put(event);
+        }
+        tx.commit();
+        tx.close();
+      }
+      sink.process();
+    } finally {
+      sink.stop();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 3ec2b68..3f3ab46 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2439,7 +2439,7 @@ sink.pathManager.extension  --       The file extension 
if the default PathManag
 sink.pathManager.prefix     --       A character string to add to the 
beginning of the file name if the default PathManager is used
 sink.rollInterval           30       Roll the file every 30 seconds. 
Specifying 0 will disable rolling and cause all events to be written to a 
single file.
 sink.serializer             TEXT     Other possible options include 
``avro_event`` or the FQCN of an implementation of EventSerializer.Builder 
interface.
-batchSize                   100
+sink.batchSize              100
 ==========================  =======  
======================================================================================================================
 
 Example for agent named a1:

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
 
b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
index 130bc64..caf4522 100644
--- 
a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
+++ 
b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
@@ -18,6 +18,8 @@
 package org.apache.flume.node;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -44,10 +46,12 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.ChannelSelectorFactory;
 import org.apache.flume.channel.DefaultChannelFactory;
 import org.apache.flume.conf.BasicConfigurationConstants;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.ComponentConfiguration;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.conf.FlumeConfiguration;
 import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.TransactionCapacitySupported;
 import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
 import org.apache.flume.conf.sink.SinkConfiguration;
 import org.apache.flume.conf.sink.SinkGroupConfiguration;
@@ -274,13 +278,8 @@ public abstract class AbstractConfigurationProvider 
implements ConfigurationProv
         try {
           Configurables.configure(source, config);
           Set<String> channelNames = config.getChannels();
-          List<Channel> sourceChannels = new ArrayList<Channel>();
-          for (String chName : channelNames) {
-            ChannelComponent channelComponent = 
channelComponentMap.get(chName);
-            if (channelComponent != null) {
-              sourceChannels.add(channelComponent.channel);
-            }
-          }
+          List<Channel> sourceChannels =
+                  getSourceChannels(channelComponentMap, source, channelNames);
           if (sourceChannels.isEmpty()) {
             String msg = String.format("Source %s is not connected to a " +
                 "channel",  sourceName);
@@ -324,15 +323,10 @@ public abstract class AbstractConfigurationProvider 
implements ConfigurationProv
                                  
context.getString(BasicConfigurationConstants.CONFIG_TYPE));
         try {
           Configurables.configure(source, context);
-          List<Channel> sourceChannels = new ArrayList<Channel>();
           String[] channelNames = context.getString(
               BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
-          for (String chName : channelNames) {
-            ChannelComponent channelComponent = 
channelComponentMap.get(chName);
-            if (channelComponent != null) {
-              sourceChannels.add(channelComponent.channel);
-            }
-          }
+          List<Channel> sourceChannels =
+                  getSourceChannels(channelComponentMap, source, 
Arrays.asList(channelNames));
           if (sourceChannels.isEmpty()) {
             String msg = String.format("Source %s is not connected to a " +
                 "channel",  sourceName);
@@ -364,6 +358,53 @@ public abstract class AbstractConfigurationProvider 
implements ConfigurationProv
     }
   }
 
+  private List<Channel> getSourceChannels(Map<String, ChannelComponent> 
channelComponentMap,
+                  Source source, Collection<String> channelNames) throws 
InstantiationException {
+    List<Channel> sourceChannels = new ArrayList<Channel>();
+    for (String chName : channelNames) {
+      ChannelComponent channelComponent = channelComponentMap.get(chName);
+      if (channelComponent != null) {
+        checkSourceChannelCompatibility(source, channelComponent.channel);
+        sourceChannels.add(channelComponent.channel);
+      }
+    }
+    return sourceChannels;
+  }
+
+  private void checkSourceChannelCompatibility(Source source, Channel channel)
+      throws InstantiationException {
+    if (source instanceof BatchSizeSupported && channel instanceof 
TransactionCapacitySupported) {
+      long transCap = ((TransactionCapacitySupported) 
channel).getTransactionCapacity();
+      long batchSize = ((BatchSizeSupported) source).getBatchSize();
+      if (transCap < batchSize) {
+        String msg = String.format(
+            "Incompatible source and channel settings defined. " +
+                "source's batch size is greater than the channels transaction 
capacity. " +
+                "Source: %s, batch size = %d, channel %s, transaction capacity 
= %d",
+            source.getName(), batchSize,
+            channel.getName(), transCap);
+        throw new InstantiationException(msg);
+      }
+    }
+  }
+
+  private void checkSinkChannelCompatibility(Sink sink, Channel channel)
+      throws InstantiationException {
+    if (sink instanceof BatchSizeSupported && channel instanceof 
TransactionCapacitySupported) {
+      long transCap = ((TransactionCapacitySupported) 
channel).getTransactionCapacity();
+      long batchSize = ((BatchSizeSupported) sink).getBatchSize();
+      if (transCap < batchSize) {
+        String msg = String.format(
+            "Incompatible sink and channel settings defined. " +
+                "sink's batch size is greater than the channels transaction 
capacity. " +
+                "Sink: %s, batch size = %d, channel %s, transaction capacity = 
%d",
+            sink.getName(), batchSize,
+            channel.getName(), transCap);
+        throw new InstantiationException(msg);
+      }
+    }
+  }
+
   private void loadSinks(AgentConfiguration agentConf,
       Map<String, ChannelComponent> channelComponentMap, Map<String, 
SinkRunner> sinkRunnerMap)
       throws InstantiationException {
@@ -387,6 +428,7 @@ public abstract class AbstractConfigurationProvider 
implements ConfigurationProv
                 "channel",  sinkName);
             throw new IllegalStateException(msg);
           }
+          checkSinkChannelCompatibility(sink, channelComponent.channel);
           sink.setChannel(channelComponent.channel);
           sinks.put(comp.getComponentName(), sink);
           channelComponent.components.add(sinkName);
@@ -417,6 +459,7 @@ public abstract class AbstractConfigurationProvider 
implements ConfigurationProv
                 "channel",  sinkName);
             throw new IllegalStateException(msg);
           }
+          checkSinkChannelCompatibility(sink, channelComponent.channel);
           sink.setChannel(channelComponent.channel);
           sinks.put(sinkName, sink);
           channelComponent.components.add(sinkName);

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
index e27d8f7..7810c5b 100644
--- 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
+++ 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
@@ -185,6 +185,56 @@ public class TestAbstractConfigurationProvider {
     Assert.assertTrue(config.getSinkRunners().size() == 0);
   }
 
+  @Test
+  public void testSinkSourceMismatchDuringConfiguration() throws Exception {
+    String agentName = "agent1";
+    String sourceType = "seq";
+    String channelType = "memory";
+    String sinkType = "avro";
+    Map<String, String> properties = getProperties(agentName, sourceType,
+        channelType, sinkType);
+    properties.put(agentName + ".channels.channel1.capacity", "1000");
+    properties.put(agentName + ".channels.channel1.transactionCapacity", 
"1000");
+    properties.put(agentName + ".sources.source1.batchSize", "1000");
+    properties.put(agentName + ".sinks.sink1.batch-size", "1000");
+    properties.put(agentName + ".sinks.sink1.hostname", "10.10.10.10");
+    properties.put(agentName + ".sinks.sink1.port", "1010");
+
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+    MaterializedConfiguration config = provider.getConfiguration();
+    Assert.assertTrue(config.getSourceRunners().size() == 1);
+    Assert.assertTrue(config.getChannels().size() == 1);
+    Assert.assertTrue(config.getSinkRunners().size() == 1);
+
+    properties.put(agentName + ".sources.source1.batchSize", "1001");
+    properties.put(agentName + ".sinks.sink1.batch-size", "1000");
+
+    provider = new MemoryConfigurationProvider(agentName, properties);
+    config = provider.getConfiguration();
+    Assert.assertTrue(config.getSourceRunners().size() == 0);
+    Assert.assertTrue(config.getChannels().size() == 1);
+    Assert.assertTrue(config.getSinkRunners().size() == 1);
+
+    properties.put(agentName + ".sources.source1.batchSize", "1000");
+    properties.put(agentName + ".sinks.sink1.batch-size", "1001");
+
+    provider = new MemoryConfigurationProvider(agentName, properties);
+    config = provider.getConfiguration();
+    Assert.assertTrue(config.getSourceRunners().size() == 1);
+    Assert.assertTrue(config.getChannels().size() == 1);
+    Assert.assertTrue(config.getSinkRunners().size() == 0);
+
+    properties.put(agentName + ".sources.source1.batchSize", "1001");
+    properties.put(agentName + ".sinks.sink1.batch-size", "1001");
+
+    provider = new MemoryConfigurationProvider(agentName, properties);
+    config = provider.getConfiguration();
+    Assert.assertTrue(config.getSourceRunners().size() == 0);
+    Assert.assertTrue(config.getChannels().size() == 0);
+    Assert.assertTrue(config.getSinkRunners().size() == 0);
+  }
+
   private Map<String, String> getProperties(String agentName,
                                             String sourceType, String 
channelType,
                                             String sinkType) {

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java
index f20462b..b28d4e5 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java
@@ -24,8 +24,11 @@ import java.util.Properties;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class AbstractRpcClient implements RpcClient {
+  private static Logger logger = 
LoggerFactory.getLogger(AbstractRpcClient.class);
 
   protected int batchSize =
       RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
@@ -61,4 +64,31 @@ public abstract class AbstractRpcClient implements RpcClient 
{
   protected abstract void configure(Properties properties)
       throws FlumeException;
 
+  /**
+   * This is to parse the batch size config for rpc clients
+   * @param properties config
+   * @return batch size
+   */
+  public static int parseBatchSize(Properties properties) {
+    String strBatchSize = properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_BATCH_SIZE);
+    logger.debug("Batch size string = " + strBatchSize);
+    int batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
+    if (strBatchSize != null && !strBatchSize.isEmpty()) {
+      try {
+        int parsedBatch = Integer.parseInt(strBatchSize);
+        if (parsedBatch < 1) {
+          logger.warn("Invalid value for batchSize: {}; Using default value.", 
parsedBatch);
+        } else {
+          batchSize = parsedBatch;
+        }
+      } catch (NumberFormatException e) {
+        logger.warn("Batchsize is not valid for RpcClient: " + strBatchSize +
+            ". Default value assigned.", e);
+      }
+    }
+
+    return batchSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
index 9d82acb..5b9cef5 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
@@ -89,23 +89,8 @@ public class FailoverRpcClient extends AbstractRpcClient 
implements RpcClient {
       }
     }
 
-    String strBatchSize = properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_BATCH_SIZE);
+    batchSize = parseBatchSize(properties);
 
-    if (strBatchSize != null && strBatchSize.trim().length() > 0) {
-      try {
-        batchSize = Integer.parseInt(strBatchSize);
-        if (batchSize < 1) {
-          logger.warn("A batch-size less than 1 was specified: " + batchSize
-              + ". Using default instead.");
-          batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
-        }
-      } catch (NumberFormatException ex) {
-        logger.warn("Invalid batch size specified: " + strBatchSize
-            + ". Using default instead.");
-      }
-
-    }
     isActive = true;
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
index d3ccf74..a025612 100644
--- 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
+++ 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
@@ -185,6 +185,7 @@ public class LoadBalancingRpcClient extends 
AbstractRpcClient {
     }
 
     selector.setHosts(hosts);
+    batchSize = parseBatchSize(properties);
     isOpen = true;
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index 21a9553..b61eb79 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -492,24 +492,7 @@ public class NettyAvroRpcClient extends AbstractRpcClient 
implements RpcClient {
       stateLock.unlock();
     }
 
-    // batch size
-    String strBatchSize = properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_BATCH_SIZE);
-    logger.debug("Batch size string = " + strBatchSize);
-    batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
-    if (strBatchSize != null && !strBatchSize.isEmpty()) {
-      try {
-        int parsedBatch = Integer.parseInt(strBatchSize);
-        if (parsedBatch < 1) {
-          logger.warn("Invalid value for batchSize: {}; Using default value.", 
parsedBatch);
-        } else {
-          batchSize = parsedBatch;
-        }
-      } catch (NumberFormatException e) {
-        logger.warn("Batchsize is not valid for RpcClient: " + strBatchSize +
-            ". Default value assigned.", e);
-      }
-    }
+    batchSize = parseBatchSize(properties);
 
     // host and port
     String hostNames = properties.getProperty(

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
index 1d21d5f..07d9c90 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
@@ -72,7 +72,6 @@ public class ThriftRpcClient extends AbstractRpcClient {
   public static final String BINARY_PROTOCOL = "binary";
   public static final String COMPACT_PROTOCOL = "compact";
 
-  private int batchSize;
   private long requestTimeout;
   private final Lock stateLock;
   private State connState;
@@ -107,12 +106,6 @@ public class ThriftRpcClient extends AbstractRpcClient {
     });
   }
 
-
-  @Override
-  public int getBatchSize() {
-    return batchSize;
-  }
-
   @Override
   public void append(Event event) throws EventDeliveryException {
     // Thrift IPC client is not thread safe, so don't allow state changes or
@@ -299,9 +292,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
             + "choose from. Defaulting to 'compact'.");
         protocol = COMPACT_PROTOCOL;
       }
-      batchSize = Integer.parseInt(properties.getProperty(
-          RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
-          RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString()));
+      batchSize = parseBatchSize(properties);
       requestTimeout = Long.parseLong(properties.getProperty(
           RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
           String.valueOf(

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index fa31262..4a44264 100644
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -19,6 +19,7 @@ package org.apache.flume.sink.kite;
 
 import org.apache.flume.auth.FlumeAuthenticationUtil;
 import org.apache.flume.auth.PrivilegedExecutor;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.sink.kite.parser.EntityParserFactory;
 import org.apache.flume.sink.kite.parser.EntityParser;
 import org.apache.flume.sink.kite.policy.FailurePolicy;
@@ -69,7 +70,7 @@ import org.kitesdk.data.Formats;
  * and loading a Dataset by name, {@code kite.dataset.name}, and namespace,
  * {@code kite.dataset.namespace}.
  */
-public class DatasetSink extends AbstractSink implements Configurable {
+public class DatasetSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
 
   private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class);
 
@@ -579,4 +580,9 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
     return Registration.lookupDatasetUri(URI.create(
         uri.getRawSchemeSpecificPart())).second().get("dataset");
   }
+
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 22306a0..8189ca8 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -43,6 +43,7 @@ import org.apache.flume.SystemClock;
 import org.apache.flume.Transaction;
 import org.apache.flume.auth.FlumeAuthenticationUtil;
 import org.apache.flume.auth.PrivilegedExecutor;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.instrumentation.SinkCounter;
@@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-public class HDFSEventSink extends AbstractSink implements Configurable {
+public class HDFSEventSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
   public interface WriterCallback {
     public void run(String filePath);
   }
@@ -559,4 +560,10 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
   int getTryCount() {
     return tryCount;
   }
+
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
index 8db008e..ec1fe62 100644
--- 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
+++ 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
@@ -28,6 +28,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.instrumentation.SinkCounter;
@@ -50,7 +51,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class HiveSink extends AbstractSink implements Configurable {
+public class HiveSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
 
   private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class);
 
@@ -513,6 +514,10 @@ public class HiveSink extends AbstractSink implements 
Configurable {
     }
   }
 
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
 
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
index ebafb9f..05eb5ff 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
@@ -38,6 +38,7 @@ import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
@@ -83,7 +84,7 @@ import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.IND
  *      ://www.elasticsearch.org/guide/reference/api/admin-indices-templates.
  *      html
  */
-public class ElasticSearchSink extends AbstractSink implements Configurable {
+public class ElasticSearchSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
 
   private static final Logger logger = LoggerFactory
       .getLogger(ElasticSearchSink.class);
@@ -172,6 +173,11 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
   }
 
   @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
+
+  @Override
   public Status process() throws EventDeliveryException {
     logger.debug("processing...");
     Status status = Status.READY;

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 881f661..bd0efa9 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -33,6 +33,7 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.instrumentation.SinkCounter;
@@ -101,7 +102,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * multiple increments are returned by the serializer, then HBase failure
  * will cause them to be re-written, when HBase comes back up.
  */
-public class AsyncHBaseSink extends AbstractSink implements Configurable {
+public class AsyncHBaseSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
 
   private String tableName;
   private byte[] columnFamily;
@@ -452,6 +453,11 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
   }
 
   @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
+
+  @Override
   public void start() {
     Preconditions.checkArgument(client == null, "Please call stop "
         + "before calling start on an old instance.");

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index 29969ad..9b9bce9 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -33,6 +33,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.auth.FlumeAuthenticationUtil;
 import org.apache.flume.auth.PrivilegedExecutor;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.instrumentation.SinkCounter;
@@ -90,7 +91,7 @@ import java.util.NavigableMap;
  * multiple increments are returned by the serializer, then HBase failure
  * will cause them to be re-written, when HBase comes back up.
  */
-public class HBaseSink extends AbstractSink implements Configurable {
+public class HBaseSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
   private String tableName;
   private byte[] columnFamily;
   private HTable table;
@@ -555,6 +556,11 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
     return serializer;
   }
 
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
+
   @VisibleForTesting
   @InterfaceAudience.Private
   interface DebugIncrementsCallback {

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
 
b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
index 1c6e285..42db0be 100644
--- 
a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
@@ -33,6 +33,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.auth.FlumeAuthenticationUtil;
 import org.apache.flume.auth.PrivilegedExecutor;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.instrumentation.SinkCounter;
@@ -96,7 +97,7 @@ import java.util.NavigableMap;
  * multiple increments are returned by the serializer, then HBase failure
  * will cause them to be re-written, when HBase comes back up.
  */
-public class HBase2Sink extends AbstractSink implements Configurable {
+public class HBase2Sink extends AbstractSink implements Configurable, 
BatchSizeSupported {
   private String tableName;
   private byte[] columnFamily;
   private Connection conn;
@@ -541,6 +542,11 @@ public class HBase2Sink extends AbstractSink implements 
Configurable {
     return serializer;
   }
 
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
+
   @VisibleForTesting
   @InterfaceAudience.Private
   interface DebugIncrementsCallback {

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 7f347d8..eaabd6e 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -29,6 +29,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.conf.LogPrivacyUtil;
@@ -102,7 +103,7 @@ import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_
  * topic
  * key
  */
-public class KafkaSink extends AbstractSink implements Configurable {
+public class KafkaSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
 
   private static final Logger logger = 
LoggerFactory.getLogger(KafkaSink.class);
 
@@ -136,7 +137,7 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
     return topic;
   }
 
-  public int getBatchSize() {
+  public long getBatchSize() {
     return batchSize;
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
index 7d9f807..6cacda0 100644
--- 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
+++ 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
@@ -22,6 +22,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.conf.LogPrivacyUtil;
@@ -36,7 +37,7 @@ import org.kitesdk.morphline.api.Command;
  * Flume sink that extracts search documents from Flume events and processes 
them using a morphline
  * {@link Command} chain.
  */
-public class MorphlineSink extends AbstractSink implements Configurable {
+public class MorphlineSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
 
   private int maxBatchSize = 1000;
   private long maxBatchDurationMillis = 1000;
@@ -194,7 +195,12 @@ public class MorphlineSink extends AbstractSink implements 
Configurable {
       txn.close();
     }
   }
-  
+
+  @Override
+  public long getBatchSize() {
+    return getMaxBatchSize();
+  }
+
   @Override
   public String toString() {
     int i = getClass().getName().lastIndexOf('.') + 1;

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index 5dd82c9..938d5e2 100644
--- 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -36,6 +36,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.AbstractPollableSource;
@@ -50,7 +51,7 @@ import com.google.common.io.Files;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class JMSSource extends AbstractPollableSource {
+public class JMSSource extends AbstractPollableSource implements 
BatchSizeSupported {
   private static final Logger logger = 
LoggerFactory.getLogger(JMSSource.class);
 
   // setup by constructor
@@ -358,4 +359,9 @@ public class JMSSource extends AbstractPollableSource {
     jmsExceptionCounter = 0;
     return consumer;
   }
+
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 8053b41..da4ec1a 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -41,6 +41,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.conf.LogPrivacyUtil;
@@ -94,7 +95,7 @@ import static 
scala.collection.JavaConverters.asJavaListConverter;
  * <p>
  */
 public class KafkaSource extends AbstractPollableSource
-        implements Configurable {
+        implements Configurable, BatchSizeSupported {
   private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
 
   // Constants used only for offset migration zookeeper connections
@@ -130,6 +131,11 @@ public class KafkaSource extends AbstractPollableSource
   private String topicHeader = null;
   private boolean setTopicHeader;
 
+  @Override
+  public long getBatchSize() {
+    return batchUpperLimit;
+  }
+
   /**
    * This class is a helper to subscribe for topics by using
    * different strategies

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
index 0c656d6..e121a2b 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
@@ -39,6 +39,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
 import org.apache.flume.PollableSource;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.AbstractSource;
@@ -57,7 +58,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gson.Gson;
 
 public class TaildirSource extends AbstractSource implements
-    PollableSource, Configurable {
+    PollableSource, Configurable, BatchSizeSupported {
 
   private static final Logger logger = 
LoggerFactory.getLogger(TaildirSource.class);
 
@@ -190,6 +191,11 @@ public class TaildirSource extends AbstractSource 
implements
     }
   }
 
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
+
   private Map<String, String> selectByKeys(Map<String, String> map, String[] 
keys) {
     Map<String, String> result = Maps.newHashMap();
     for (String key : keys) {

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
 
b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
index d812023..948e6c0 100644
--- 
a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
+++ 
b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
@@ -39,6 +39,7 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.source.AbstractSource;
@@ -68,7 +69,7 @@ import twitter4j.auth.AccessToken;
 @InterfaceStability.Unstable
 public class TwitterSource
     extends AbstractSource
-    implements EventDrivenSource, Configurable, StatusListener {
+    implements EventDrivenSource, Configurable, StatusListener, 
BatchSizeSupported {
 
   private TwitterStream twitterStream;
   private Schema avroSchema;
@@ -325,4 +326,9 @@ public class TwitterSource
   public void onException(Exception e) {
     LOGGER.error("Exception while streaming tweets", e);
   }
+
+  @Override
+  public long getBatchSize() {
+    return maxBatchSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfig.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfig.java 
b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfig.java
new file mode 100644
index 0000000..57e720c
--- /dev/null
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfig.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.test.agent;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.test.util.StagedInstall;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.alias.CredentialShell;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestConfig {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TestConfig.class);
+
+  @ClassRule
+  public static final EnvironmentVariables environmentVariables
+      = new EnvironmentVariables();
+
+  private Properties agentProps;
+  private Map<String, String> agentEnv;
+  private File sinkOutputDir1;
+  private File sinkOutputDir2;
+  private File sinkOutputDir3;
+  private File hadoopCredStore;
+
+  @Before
+  public void setup() throws Exception {
+
+    File agentDir = StagedInstall.getInstance().getStageDir();
+    LOGGER.debug("Using agent stage dir: {}", agentDir);
+
+    File testDir = new File(agentDir, TestConfig.class.getName());
+    if (testDir.exists()) {
+      FileUtils.deleteDirectory(testDir);
+    }
+    assertTrue(testDir.mkdirs());
+
+    agentProps = new Properties();
+    agentEnv = new HashMap<>();
+
+    // Create the rest of the properties file
+    agentProps.put("agent.sources.seq-01.type", "seq");
+    agentProps.put("agent.sources.seq-01.totalEvents", "100");
+    agentProps.put("agent.sources.seq-01.channels", "mem-01 mem-02 mem-03");
+    agentProps.put("agent.channels.mem-01.type", "MEMORY");
+    agentProps.put("agent.channels.mem-01.capacity", String.valueOf(100000));
+    agentProps.put("agent.channels.mem-02.type", "MEMORY");
+    agentProps.put("agent.channels.mem-02.capacity", String.valueOf(100000));
+    agentProps.put("agent.channels.mem-03.type", "MEMORY");
+    agentProps.put("agent.channels.mem-04.capacity", String.valueOf(100000));
+
+    sinkOutputDir1 = new File(testDir, "out1");
+    assertTrue("Unable to create sink output dir: " + sinkOutputDir1.getPath(),
+        sinkOutputDir1.mkdir());
+    sinkOutputDir2 = new File(testDir, "out2");
+    assertTrue("Unable to create sink output dir: " + sinkOutputDir2.getPath(),
+        sinkOutputDir2.mkdir());
+    sinkOutputDir3 = new File(testDir, "out3");
+    assertTrue("Unable to create sink output dir: " + sinkOutputDir3.getPath(),
+        sinkOutputDir3.mkdir());
+
+    environmentVariables.set("HADOOP_CREDSTORE_PASSWORD", "envSecret");
+
+    agentEnv.put("dirname_env", sinkOutputDir1.getAbsolutePath());
+    agentEnv.put("HADOOP_CREDSTORE_PASSWORD", "envSecret");
+
+    hadoopCredStore = new File(testDir, "credstore.jceks");
+    String providerPath = "jceks://file/" + hadoopCredStore.getAbsolutePath();
+
+    ToolRunner.run(
+        new Configuration(), new CredentialShell(),
+        ("create dirname_hadoop -value " + sinkOutputDir3.getAbsolutePath()
+            + " -provider " + providerPath).split(" "));
+
+
+    agentProps.put("agent.sinks.roll-01.channel", "mem-01");
+    agentProps.put("agent.sinks.roll-01.type", "FILE_ROLL");
+    agentProps.put("agent.sinks.roll-01.sink.directory", 
"${filter-01[\"dirname_env\"]}");
+    agentProps.put("agent.sinks.roll-01.sink.rollInterval", "0");
+    agentProps.put("agent.sinks.roll-02.channel", "mem-02");
+    agentProps.put("agent.sinks.roll-02.type", "FILE_ROLL");
+    agentProps.put("agent.sinks.roll-02.sink.directory",
+        sinkOutputDir2.getParentFile().getAbsolutePath() + 
"/${filter-02['out2']}");
+    agentProps.put("agent.sinks.roll-02.sink.rollInterval", "0");
+    agentProps.put("agent.sinks.roll-03.channel", "mem-03");
+    agentProps.put("agent.sinks.roll-03.type", "FILE_ROLL");
+    agentProps.put("agent.sinks.roll-03.sink.directory", 
"${filter-03[dirname_hadoop]}");
+    agentProps.put("agent.sinks.roll-03.sink.rollInterval", "0");
+
+    agentProps.put("agent.configfilters.filter-01.type", "env");
+    agentProps.put("agent.configfilters.filter-02.type", "external");
+    agentProps.put("agent.configfilters.filter-02.command", "echo");
+    agentProps.put("agent.configfilters.filter-03.type", "hadoop");
+    agentProps.put("agent.configfilters.filter-03.credential.provider.path", 
providerPath);
+
+    agentProps.put("agent.sources", "seq-01");
+    agentProps.put("agent.channels", "mem-01 mem-02 mem-03");
+    agentProps.put("agent.sinks", "roll-01 roll-02 roll-03");
+    agentProps.put("agent.configfilters", "filter-01 filter-02 filter-03");
+  }
+
+  @After
+  public void teardown() throws Exception {
+    StagedInstall.getInstance().stopAgent();
+  }
+
+  private void validateSeenEvents(File outDir, int outFiles, int events)
+      throws IOException {
+    File[] sinkOutputDirChildren = outDir.listFiles();
+    assertEquals("Unexpected number of files in output dir",
+        outFiles, sinkOutputDirChildren.length);
+    Set<String> seenEvents = new HashSet<>();
+    for (File outFile : sinkOutputDirChildren) {
+      Scanner scanner = new Scanner(outFile);
+      while (scanner.hasNext()) {
+        seenEvents.add(scanner.nextLine());
+      }
+    }
+    for (int event = 0; event < events; event++) {
+      assertTrue(
+          "Missing event: {" + event + "}",
+          seenEvents.contains(String.valueOf(event))
+      );
+    }
+  }
+
+  @Test
+  public void testConfigReplacement() throws Exception {
+    LOGGER.debug("testConfigReplacement() started.");
+
+    StagedInstall.getInstance().startAgent("agent", agentProps, agentEnv);
+
+    TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files
+
+    // Ensure we received all events.
+    validateSeenEvents(sinkOutputDir1, 1, 100);
+    validateSeenEvents(sinkOutputDir2, 1, 100);
+    validateSeenEvents(sinkOutputDir3, 1, 100);
+    LOGGER.debug("Processed all the events!");
+
+    LOGGER.debug("testConfigReplacement() ended.");
+  }
+
+  @Test
+  public void testConfigReload() throws Exception {
+    LOGGER.debug("testConfigReplacement() started.");
+
+    agentProps.put("agent.channels.mem-01.transactionCapacity", "10");
+    agentProps.put("agent.sinks.roll-01.sink.batchSize", "20");
+    StagedInstall.getInstance().startAgent("agent", agentProps, agentEnv);
+
+    TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files
+
+    // This directory is empty due to misconfiguration
+    validateSeenEvents(sinkOutputDir1, 0, 0);
+
+    // These are well configured
+    validateSeenEvents(sinkOutputDir2, 1, 100);
+    validateSeenEvents(sinkOutputDir3, 1, 100);
+    LOGGER.debug("Processed all the events!");
+
+    //repair the config
+    agentProps.put("agent.channels.mem-01.transactionCapacity", "20");
+    StagedInstall.getInstance().reconfigure(agentProps);
+
+    TimeUnit.SECONDS.sleep(40); // Wait for sources and sink to process files
+    // Ensure we received all events.
+    validateSeenEvents(sinkOutputDir1, 1, 100);
+
+    LOGGER.debug("testConfigReplacement() ended.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java
 
b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java
deleted file mode 100644
index b82c4b6..0000000
--- 
a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.test.agent;
-
-import org.apache.flume.test.util.StagedInstall;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.alias.CredentialShell;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.contrib.java.lang.system.EnvironmentVariables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Scanner;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestConfigFilters {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(TestConfigFilters.class);
-
-  @ClassRule
-  public static final EnvironmentVariables environmentVariables
-      = new EnvironmentVariables();
-
-  private Properties agentProps;
-  private Map<String, String> agentEnv;
-  private File sinkOutputDir1;
-  private File sinkOutputDir2;
-  private File sinkOutputDir3;
-  private File hadoopCredStore;
-
-  @Before
-  public void setup() throws Exception {
-
-    File agentDir = StagedInstall.getInstance().getStageDir();
-    LOGGER.debug("Using agent stage dir: {}", agentDir);
-
-    File testDir = new File(agentDir, TestConfigFilters.class.getName());
-    assertTrue(testDir.mkdirs());
-
-    agentProps = new Properties();
-    agentEnv = new HashMap<>();
-
-    // Create the rest of the properties file
-    agentProps.put("agent.sources.seq-01.type", "seq");
-    agentProps.put("agent.sources.seq-01.totalEvents", "100");
-    agentProps.put("agent.sources.seq-01.channels", "mem-01 mem-02 mem-03");
-    agentProps.put("agent.channels.mem-01.type", "MEMORY");
-    agentProps.put("agent.channels.mem-01.capacity", String.valueOf(100000));
-    agentProps.put("agent.channels.mem-02.type", "MEMORY");
-    agentProps.put("agent.channels.mem-02.capacity", String.valueOf(100000));
-    agentProps.put("agent.channels.mem-03.type", "MEMORY");
-    agentProps.put("agent.channels.mem-04.capacity", String.valueOf(100000));
-
-    sinkOutputDir1 = new File(testDir, "out1");
-    assertTrue("Unable to create sink output dir: " + sinkOutputDir1.getPath(),
-        sinkOutputDir1.mkdir());
-    sinkOutputDir2 = new File(testDir, "out2");
-    assertTrue("Unable to create sink output dir: " + sinkOutputDir2.getPath(),
-        sinkOutputDir2.mkdir());
-    sinkOutputDir3 = new File(testDir, "out3");
-    assertTrue("Unable to create sink output dir: " + sinkOutputDir3.getPath(),
-        sinkOutputDir3.mkdir());
-
-    environmentVariables.set("HADOOP_CREDSTORE_PASSWORD", "envSecret");
-
-    agentEnv.put("dirname_env", sinkOutputDir1.getAbsolutePath());
-    agentEnv.put("HADOOP_CREDSTORE_PASSWORD", "envSecret");
-
-    hadoopCredStore = new File(testDir, "credstore.jceks");
-    String providerPath = "jceks://file/" + hadoopCredStore.getAbsolutePath();
-
-    ToolRunner.run(
-        new Configuration(), new CredentialShell(),
-        ("create dirname_hadoop -value " + sinkOutputDir3.getAbsolutePath()
-            + " -provider " + providerPath).split(" "));
-
-
-    agentProps.put("agent.sinks.roll-01.channel", "mem-01");
-    agentProps.put("agent.sinks.roll-01.type", "FILE_ROLL");
-    agentProps.put("agent.sinks.roll-01.sink.directory", 
"${filter-01[\"dirname_env\"]}");
-    agentProps.put("agent.sinks.roll-01.sink.rollInterval", "0");
-    agentProps.put("agent.sinks.roll-02.channel", "mem-02");
-    agentProps.put("agent.sinks.roll-02.type", "FILE_ROLL");
-    agentProps.put("agent.sinks.roll-02.sink.directory",
-        sinkOutputDir2.getParentFile().getAbsolutePath() + 
"/${filter-02['out2']}");
-    agentProps.put("agent.sinks.roll-02.sink.rollInterval", "0");
-    agentProps.put("agent.sinks.roll-03.channel", "mem-03");
-    agentProps.put("agent.sinks.roll-03.type", "FILE_ROLL");
-    agentProps.put("agent.sinks.roll-03.sink.directory", 
"${filter-03[dirname_hadoop]}");
-    agentProps.put("agent.sinks.roll-03.sink.rollInterval", "0");
-
-    agentProps.put("agent.configfilters.filter-01.type", "env");
-    agentProps.put("agent.configfilters.filter-02.type", "external");
-    agentProps.put("agent.configfilters.filter-02.command", "echo");
-    agentProps.put("agent.configfilters.filter-03.type", "hadoop");
-    agentProps.put("agent.configfilters.filter-03.credential.provider.path", 
providerPath);
-
-    agentProps.put("agent.sources", "seq-01");
-    agentProps.put("agent.channels", "mem-01 mem-02 mem-03");
-    agentProps.put("agent.sinks", "roll-01 roll-02 roll-03");
-    agentProps.put("agent.configfilters", "filter-01 filter-02 filter-03");
-  }
-
-  @After
-  public void teardown() throws Exception {
-    StagedInstall.getInstance().stopAgent();
-  }
-
-  private void validateSeenEvents(File outDir, int outFiles, int events)
-      throws IOException {
-    File[] sinkOutputDirChildren = outDir.listFiles();
-    assertEquals("Unexpected number of files in output dir",
-        outFiles, sinkOutputDirChildren.length);
-    Set<String> seenEvents = new HashSet<>();
-    for (File outFile : sinkOutputDirChildren) {
-      Scanner scanner = new Scanner(outFile);
-      while (scanner.hasNext()) {
-        seenEvents.add(scanner.nextLine());
-      }
-    }
-    for (int event = 0; event < events; event++) {
-      assertTrue(
-          "Missing event: {" + event + "}",
-          seenEvents.contains(String.valueOf(event))
-      );
-    }
-  }
-
-  @Test
-  public void testConfigReplacement() throws Exception {
-    LOGGER.debug("testConfigReplacement() started.");
-
-    StagedInstall.getInstance().startAgent("agent", agentProps, agentEnv);
-
-    TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files
-
-    // Ensure we received all events.
-    validateSeenEvents(sinkOutputDir1, 1, 100);
-    validateSeenEvents(sinkOutputDir2, 1, 100);
-    validateSeenEvents(sinkOutputDir3, 1, 100);
-    LOGGER.debug("Processed all the events!");
-
-    LOGGER.debug("testConfigReplacement() ended.");
-  }
-
-}

Reply via email to