Repository: flume Updated Branches: refs/heads/trunk bb5e374df -> dad828acb
FLUME-3246 Validate flume configuration to prevent larger source batchsize than the channel transaction capacity The loadSources() method seemed like an appropriate place to check this. Added 2 new interfaces for getting the transaction capacity and the batch size fields. The check is only done for channels that implement the TransactioCapacitySupported interface and sources and sinks that implement the BatchSizeSupported interface. This closes #212 Reviewers: Ferenc Szabo, Peter Turcsanyi (Endre Major via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dad828ac Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dad828ac Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dad828ac Branch: refs/heads/trunk Commit: dad828acbe40b218dc9c5bc741f5a6c7fa8b338f Parents: bb5e374 Author: Endre Major <[email protected]> Authored: Tue Jun 19 14:54:50 2018 +0200 Committer: Ferenc Szabo <[email protected]> Committed: Wed Aug 29 07:57:21 2018 +0200 ---------------------------------------------------------------------- .../apache/flume/channel/file/FileChannel.java | 8 +- .../org/apache/flume/channel/MemoryChannel.java | 7 +- .../apache/flume/conf/BatchSizeSupported.java | 32 +++ .../conf/TransactionCapacitySupported.java | 32 +++ .../org/apache/flume/sink/AbstractRpcSink.java | 17 +- .../java/org/apache/flume/sink/NullSink.java | 8 +- .../org/apache/flume/sink/RollingFileSink.java | 7 +- .../org/apache/flume/source/ExecSource.java | 9 +- .../flume/source/MultiportSyslogTCPSource.java | 8 +- .../flume/source/SequenceGeneratorSource.java | 7 +- .../flume/source/SpoolDirectorySource.java | 8 +- .../org/apache/flume/source/StressSource.java | 9 +- .../apache/flume/sink/TestRollingFileSink.java | 42 ++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 +- .../node/AbstractConfigurationProvider.java | 71 +++++-- .../node/TestAbstractConfigurationProvider.java | 50 +++++ .../org/apache/flume/api/AbstractRpcClient.java | 30 +++ .../org/apache/flume/api/FailoverRpcClient.java | 17 +- .../flume/api/LoadBalancingRpcClient.java | 1 + .../apache/flume/api/NettyAvroRpcClient.java | 19 +- .../org/apache/flume/api/ThriftRpcClient.java | 11 +- .../org/apache/flume/sink/kite/DatasetSink.java | 8 +- .../apache/flume/sink/hdfs/HDFSEventSink.java | 9 +- .../org/apache/flume/sink/hive/HiveSink.java | 7 +- .../sink/elasticsearch/ElasticSearchSink.java | 8 +- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 8 +- .../org/apache/flume/sink/hbase/HBaseSink.java | 8 +- .../apache/flume/sink/hbase2/HBase2Sink.java | 8 +- .../org/apache/flume/sink/kafka/KafkaSink.java | 5 +- .../sink/solr/morphline/MorphlineSink.java | 10 +- .../org/apache/flume/source/jms/JMSSource.java | 8 +- .../apache/flume/source/kafka/KafkaSource.java | 8 +- .../flume/source/taildir/TaildirSource.java | 8 +- .../flume/source/twitter/TwitterSource.java | 8 +- .../org/apache/flume/test/agent/TestConfig.java | 209 +++++++++++++++++++ .../flume/test/agent/TestConfigFilters.java | 176 ---------------- 36 files changed, 623 insertions(+), 260 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index 3194592..fe7e80c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -40,6 +40,7 @@ import org.apache.flume.channel.file.encryption.EncryptionConfiguration; import org.apache.flume.channel.file.encryption.KeyProvider; import org.apache.flume.channel.file.encryption.KeyProviderFactory; import org.apache.flume.channel.file.instrumentation.FileChannelCounter; +import org.apache.flume.conf.TransactionCapacitySupported; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,7 @@ import java.util.concurrent.TimeUnit; @InterfaceAudience.Private @InterfaceStability.Stable @Disposable -public class FileChannel extends BasicChannelSemantics { +public class FileChannel extends BasicChannelSemantics implements TransactionCapacitySupported { private static final Logger LOG = LoggerFactory.getLogger(FileChannel.class); @@ -444,6 +445,11 @@ public class FileChannel extends BasicChannelSemantics { return channelCounter; } + @Override + public long getTransactionCapacity() { + return transactionCapacity; + } + /** * Transaction backed by a file. This transaction supports either puts * or takes but not both. http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java index add40e9..20da572 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java @@ -27,6 +27,7 @@ import org.apache.flume.Event; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.annotations.Recyclable; +import org.apache.flume.conf.TransactionCapacitySupported; import org.apache.flume.instrumentation.ChannelCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ import java.util.concurrent.TimeUnit; @InterfaceAudience.Public @InterfaceStability.Stable @Recyclable -public class MemoryChannel extends BasicChannelSemantics { +public class MemoryChannel extends BasicChannelSemantics implements TransactionCapacitySupported { private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class); private static final Integer defaultCapacity = 100; private static final Integer defaultTransCapacity = 100; @@ -379,4 +380,8 @@ public class MemoryChannel extends BasicChannelSemantics { int getBytesRemainingValue() { return bytesRemaining.availablePermits(); } + + public long getTransactionCapacity() { + return transCapacity; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/conf/BatchSizeSupported.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/conf/BatchSizeSupported.java b/flume-ng-core/src/main/java/org/apache/flume/conf/BatchSizeSupported.java new file mode 100644 index 0000000..3aa477e --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/conf/BatchSizeSupported.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.conf; + +/** + * This interface indicates that a component does batching and the batch size + * is publicly available. + * + */ +public interface BatchSizeSupported { + + /** + * Returns the batch size + */ + long getBatchSize(); + +} http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/conf/TransactionCapacitySupported.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/conf/TransactionCapacitySupported.java b/flume-ng-core/src/main/java/org/apache/flume/conf/TransactionCapacitySupported.java new file mode 100644 index 0000000..bb543ac --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/conf/TransactionCapacitySupported.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.conf; + +/** + * This interface indicates that a component has a transaction capacity + * and it is publicly available. + * + */ +public interface TransactionCapacitySupported { + + /** + * Returns the transaction capacity + */ + long getTransactionCapacity(); + +} http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java index 5a5993a..da347e0 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java @@ -30,8 +30,10 @@ import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.Transaction; +import org.apache.flume.api.AbstractRpcClient; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientConfigurationConstants; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.slf4j.Logger; @@ -141,7 +143,8 @@ import java.util.concurrent.locks.ReentrantLock; * This method will be called whenever this sink needs to create a new * connection to the source. */ -public abstract class AbstractRpcSink extends AbstractSink implements Configurable { +public abstract class AbstractRpcSink extends AbstractSink implements Configurable, + BatchSizeSupported { private static final Logger logger = LoggerFactory.getLogger(AbstractRpcSink.class); private String hostname; @@ -156,6 +159,11 @@ public abstract class AbstractRpcSink extends AbstractSink implements Configurab Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("Rpc Sink Reset Thread").build()); + // batchSize is used in the clients, here it is only used for config validation + // before the client is configured + private int batchSize; + + @Override public void configure(Context context) { clientProps = new Properties(); @@ -174,6 +182,8 @@ public abstract class AbstractRpcSink extends AbstractSink implements Configurab clientProps.setProperty(entry.getKey(), entry.getValue()); } + batchSize = AbstractRpcClient.parseBatchSize(clientProps); + if (sinkCounter == null) { sinkCounter = new SinkCounter(getName()); } @@ -401,4 +411,9 @@ public abstract class AbstractRpcSink extends AbstractSink implements Configurab RpcClient getUnderlyingClient() { return client; } + + @Override + public long getBatchSize() { + return batchSize; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java index eb00e15..9347f8e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java @@ -25,6 +25,7 @@ import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Sink; import org.apache.flume.Transaction; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,7 @@ import org.slf4j.LoggerFactory; * TODO * </p> */ -public class NullSink extends AbstractSink implements Configurable { +public class NullSink extends AbstractSink implements Configurable, BatchSizeSupported { private static final Logger logger = LoggerFactory.getLogger(NullSink.class); @@ -137,4 +138,9 @@ public class NullSink extends AbstractSink implements Configurable { return "NullSink " + getName() + " { batchSize: " + batchSize + " }"; } + @Override + public long getBatchSize() { + return batchSize; + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java index 9b0827a..d48a3c8 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java @@ -31,6 +31,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.formatter.output.PathManager; import org.apache.flume.formatter.output.PathManagerFactory; @@ -43,7 +44,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flume.serialization.EventSerializer; import org.apache.flume.serialization.EventSerializerFactory; -public class RollingFileSink extends AbstractSink implements Configurable { +public class RollingFileSink extends AbstractSink implements Configurable, BatchSizeSupported { private static final Logger logger = LoggerFactory .getLogger(RollingFileSink.class); @@ -284,4 +285,8 @@ public class RollingFileSink extends AbstractSink implements Configurable { this.rollInterval = rollInterval; } + @Override + public long getBatchSize() { + return batchSize; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java index eaafbd6..2bb4077 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java @@ -38,6 +38,7 @@ import org.apache.flume.EventDrivenSource; import org.apache.flume.Source; import org.apache.flume.SystemClock; import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; @@ -146,7 +147,8 @@ import java.nio.charset.Charset; * TODO * </p> */ -public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable { +public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable, + BatchSizeSupported { private static final Logger logger = LoggerFactory.getLogger(ExecSource.class); @@ -248,6 +250,11 @@ public class ExecSource extends AbstractSource implements EventDrivenSource, Con } } + @Override + public long getBatchSize() { + return bufferCount; + } + private static class ExecRunnable implements Runnable { public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java index 3c59b47..82cb2f2 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java @@ -34,6 +34,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.event.EventBuilder; @@ -50,7 +51,7 @@ import org.slf4j.LoggerFactory; * */ public class MultiportSyslogTCPSource extends AbstractSource implements - EventDrivenSource, Configurable { + EventDrivenSource, Configurable, BatchSizeSupported { public static final Logger logger = LoggerFactory.getLogger( MultiportSyslogTCPSource.class); @@ -208,6 +209,11 @@ public class MultiportSyslogTCPSource extends AbstractSource implements return "Multiport Syslog TCP source " + getName(); } + @Override + public long getBatchSize() { + return batchSize; + } + static class MultiportSyslogHandler extends IoHandlerAdapter { private static final String SAVED_BUF = "savedBuffer"; http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java index e494bfb..6719606 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java @@ -25,6 +25,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; @@ -35,7 +36,7 @@ import java.util.ArrayList; import java.util.List; public class SequenceGeneratorSource extends AbstractPollableSource implements - Configurable { + Configurable, BatchSizeSupported { private static final Logger logger = LoggerFactory .getLogger(SequenceGeneratorSource.class); @@ -115,4 +116,8 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements logger.info("Sequence generator source do stopped. Metrics:{}",getName(), sourceCounter); } + @Override + public long getBatchSize() { + return batchSize; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 305ca3b..db4e0a3 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -27,6 +27,7 @@ import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.FlumeException; import org.apache.flume.client.avro.ReliableSpoolingFileEventReader; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.serialization.DecodeErrorPolicy; @@ -45,7 +46,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.*; public class SpoolDirectorySource extends AbstractSource - implements Configurable, EventDrivenSource { + implements Configurable, EventDrivenSource, BatchSizeSupported { private static final Logger logger = LoggerFactory.getLogger(SpoolDirectorySource.class); @@ -236,6 +237,11 @@ public class SpoolDirectorySource extends AbstractSource return recursiveDirectorySearch; } + @Override + public long getBatchSize() { + return batchSize; + } + @VisibleForTesting protected class SpoolDirectoryRunnable implements Runnable { private ReliableSpoolingFileEventReader reader; http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java index f37174a..6c3c330 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java @@ -30,6 +30,7 @@ import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.slf4j.Logger; @@ -54,7 +55,8 @@ import org.slf4j.LoggerFactory; * * See {@link StressSource#configure(Context)} for configuration options. */ -public class StressSource extends AbstractPollableSource implements Configurable { +public class StressSource extends AbstractPollableSource + implements Configurable, BatchSizeSupported { private static final Logger logger = LoggerFactory.getLogger(StressSource.class); @@ -171,4 +173,9 @@ public class StressSource extends AbstractPollableSource implements Configurable protected void doStop() throws FlumeException { logger.info("Stress source do stop. Metrics:{}", counterGroup); } + + @Override + public long getBatchSize() { + return batchSize; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java index 6b74e2d..c88b541 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java @@ -218,4 +218,46 @@ public class TestRollingFileSink { file.delete(); } } + + /** + * This test is to reproduce batch size and + * transaction capacity related configuration + * problems + */ + @Test(expected = EventDeliveryException.class) + public void testTransCapBatchSizeCompatibility() throws EventDeliveryException { + + Context context = new Context(); + + context.put("sink.directory", tmpDir.getPath()); + context.put("sink.rollInterval", "0"); + context.put("sink.batchSize", "1000"); + + Configurables.configure(sink, context); + + context.put("capacity", "50"); + context.put("transactionCapacity", "5"); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, context); + + sink.setChannel(channel); + sink.start(); + + try { + for (int j = 0; j < 10; j++) { + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < 5; i++) { + Event event = new SimpleEvent(); + event.setBody(("Test event " + i).getBytes()); + channel.put(event); + } + tx.commit(); + tx.close(); + } + sink.process(); + } finally { + sink.stop(); + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 3ec2b68..3f3ab46 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2439,7 +2439,7 @@ sink.pathManager.extension -- The file extension if the default PathManag sink.pathManager.prefix -- A character string to add to the beginning of the file name if the default PathManager is used sink.rollInterval 30 Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file. sink.serializer TEXT Other possible options include ``avro_event`` or the FQCN of an implementation of EventSerializer.Builder interface. -batchSize 100 +sink.batchSize 100 ========================== ======= ====================================================================================================================== Example for agent named a1: http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java index 130bc64..caf4522 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java @@ -18,6 +18,8 @@ package org.apache.flume.node; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -44,10 +46,12 @@ import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.ChannelSelectorFactory; import org.apache.flume.channel.DefaultChannelFactory; import org.apache.flume.conf.BasicConfigurationConstants; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.conf.Configurables; import org.apache.flume.conf.FlumeConfiguration; import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration; +import org.apache.flume.conf.TransactionCapacitySupported; import org.apache.flume.conf.channel.ChannelSelectorConfiguration; import org.apache.flume.conf.sink.SinkConfiguration; import org.apache.flume.conf.sink.SinkGroupConfiguration; @@ -274,13 +278,8 @@ public abstract class AbstractConfigurationProvider implements ConfigurationProv try { Configurables.configure(source, config); Set<String> channelNames = config.getChannels(); - List<Channel> sourceChannels = new ArrayList<Channel>(); - for (String chName : channelNames) { - ChannelComponent channelComponent = channelComponentMap.get(chName); - if (channelComponent != null) { - sourceChannels.add(channelComponent.channel); - } - } + List<Channel> sourceChannels = + getSourceChannels(channelComponentMap, source, channelNames); if (sourceChannels.isEmpty()) { String msg = String.format("Source %s is not connected to a " + "channel", sourceName); @@ -324,15 +323,10 @@ public abstract class AbstractConfigurationProvider implements ConfigurationProv context.getString(BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(source, context); - List<Channel> sourceChannels = new ArrayList<Channel>(); String[] channelNames = context.getString( BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+"); - for (String chName : channelNames) { - ChannelComponent channelComponent = channelComponentMap.get(chName); - if (channelComponent != null) { - sourceChannels.add(channelComponent.channel); - } - } + List<Channel> sourceChannels = + getSourceChannels(channelComponentMap, source, Arrays.asList(channelNames)); if (sourceChannels.isEmpty()) { String msg = String.format("Source %s is not connected to a " + "channel", sourceName); @@ -364,6 +358,53 @@ public abstract class AbstractConfigurationProvider implements ConfigurationProv } } + private List<Channel> getSourceChannels(Map<String, ChannelComponent> channelComponentMap, + Source source, Collection<String> channelNames) throws InstantiationException { + List<Channel> sourceChannels = new ArrayList<Channel>(); + for (String chName : channelNames) { + ChannelComponent channelComponent = channelComponentMap.get(chName); + if (channelComponent != null) { + checkSourceChannelCompatibility(source, channelComponent.channel); + sourceChannels.add(channelComponent.channel); + } + } + return sourceChannels; + } + + private void checkSourceChannelCompatibility(Source source, Channel channel) + throws InstantiationException { + if (source instanceof BatchSizeSupported && channel instanceof TransactionCapacitySupported) { + long transCap = ((TransactionCapacitySupported) channel).getTransactionCapacity(); + long batchSize = ((BatchSizeSupported) source).getBatchSize(); + if (transCap < batchSize) { + String msg = String.format( + "Incompatible source and channel settings defined. " + + "source's batch size is greater than the channels transaction capacity. " + + "Source: %s, batch size = %d, channel %s, transaction capacity = %d", + source.getName(), batchSize, + channel.getName(), transCap); + throw new InstantiationException(msg); + } + } + } + + private void checkSinkChannelCompatibility(Sink sink, Channel channel) + throws InstantiationException { + if (sink instanceof BatchSizeSupported && channel instanceof TransactionCapacitySupported) { + long transCap = ((TransactionCapacitySupported) channel).getTransactionCapacity(); + long batchSize = ((BatchSizeSupported) sink).getBatchSize(); + if (transCap < batchSize) { + String msg = String.format( + "Incompatible sink and channel settings defined. " + + "sink's batch size is greater than the channels transaction capacity. " + + "Sink: %s, batch size = %d, channel %s, transaction capacity = %d", + sink.getName(), batchSize, + channel.getName(), transCap); + throw new InstantiationException(msg); + } + } + } + private void loadSinks(AgentConfiguration agentConf, Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap) throws InstantiationException { @@ -387,6 +428,7 @@ public abstract class AbstractConfigurationProvider implements ConfigurationProv "channel", sinkName); throw new IllegalStateException(msg); } + checkSinkChannelCompatibility(sink, channelComponent.channel); sink.setChannel(channelComponent.channel); sinks.put(comp.getComponentName(), sink); channelComponent.components.add(sinkName); @@ -417,6 +459,7 @@ public abstract class AbstractConfigurationProvider implements ConfigurationProv "channel", sinkName); throw new IllegalStateException(msg); } + checkSinkChannelCompatibility(sink, channelComponent.channel); sink.setChannel(channelComponent.channel); sinks.put(sinkName, sink); channelComponent.components.add(sinkName); http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java index e27d8f7..7810c5b 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java @@ -185,6 +185,56 @@ public class TestAbstractConfigurationProvider { Assert.assertTrue(config.getSinkRunners().size() == 0); } + @Test + public void testSinkSourceMismatchDuringConfiguration() throws Exception { + String agentName = "agent1"; + String sourceType = "seq"; + String channelType = "memory"; + String sinkType = "avro"; + Map<String, String> properties = getProperties(agentName, sourceType, + channelType, sinkType); + properties.put(agentName + ".channels.channel1.capacity", "1000"); + properties.put(agentName + ".channels.channel1.transactionCapacity", "1000"); + properties.put(agentName + ".sources.source1.batchSize", "1000"); + properties.put(agentName + ".sinks.sink1.batch-size", "1000"); + properties.put(agentName + ".sinks.sink1.hostname", "10.10.10.10"); + properties.put(agentName + ".sinks.sink1.port", "1010"); + + MemoryConfigurationProvider provider = + new MemoryConfigurationProvider(agentName, properties); + MaterializedConfiguration config = provider.getConfiguration(); + Assert.assertTrue(config.getSourceRunners().size() == 1); + Assert.assertTrue(config.getChannels().size() == 1); + Assert.assertTrue(config.getSinkRunners().size() == 1); + + properties.put(agentName + ".sources.source1.batchSize", "1001"); + properties.put(agentName + ".sinks.sink1.batch-size", "1000"); + + provider = new MemoryConfigurationProvider(agentName, properties); + config = provider.getConfiguration(); + Assert.assertTrue(config.getSourceRunners().size() == 0); + Assert.assertTrue(config.getChannels().size() == 1); + Assert.assertTrue(config.getSinkRunners().size() == 1); + + properties.put(agentName + ".sources.source1.batchSize", "1000"); + properties.put(agentName + ".sinks.sink1.batch-size", "1001"); + + provider = new MemoryConfigurationProvider(agentName, properties); + config = provider.getConfiguration(); + Assert.assertTrue(config.getSourceRunners().size() == 1); + Assert.assertTrue(config.getChannels().size() == 1); + Assert.assertTrue(config.getSinkRunners().size() == 0); + + properties.put(agentName + ".sources.source1.batchSize", "1001"); + properties.put(agentName + ".sinks.sink1.batch-size", "1001"); + + provider = new MemoryConfigurationProvider(agentName, properties); + config = provider.getConfiguration(); + Assert.assertTrue(config.getSourceRunners().size() == 0); + Assert.assertTrue(config.getChannels().size() == 0); + Assert.assertTrue(config.getSinkRunners().size() == 0); + } + private Map<String, String> getProperties(String agentName, String sourceType, String channelType, String sinkType) { http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java index f20462b..b28d4e5 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java @@ -24,8 +24,11 @@ import java.util.Properties; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractRpcClient implements RpcClient { + private static Logger logger = LoggerFactory.getLogger(AbstractRpcClient.class); protected int batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE; @@ -61,4 +64,31 @@ public abstract class AbstractRpcClient implements RpcClient { protected abstract void configure(Properties properties) throws FlumeException; + /** + * This is to parse the batch size config for rpc clients + * @param properties config + * @return batch size + */ + public static int parseBatchSize(Properties properties) { + String strBatchSize = properties.getProperty( + RpcClientConfigurationConstants.CONFIG_BATCH_SIZE); + logger.debug("Batch size string = " + strBatchSize); + int batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE; + if (strBatchSize != null && !strBatchSize.isEmpty()) { + try { + int parsedBatch = Integer.parseInt(strBatchSize); + if (parsedBatch < 1) { + logger.warn("Invalid value for batchSize: {}; Using default value.", parsedBatch); + } else { + batchSize = parsedBatch; + } + } catch (NumberFormatException e) { + logger.warn("Batchsize is not valid for RpcClient: " + strBatchSize + + ". Default value assigned.", e); + } + } + + return batchSize; + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java index 9d82acb..5b9cef5 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java @@ -89,23 +89,8 @@ public class FailoverRpcClient extends AbstractRpcClient implements RpcClient { } } - String strBatchSize = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_BATCH_SIZE); + batchSize = parseBatchSize(properties); - if (strBatchSize != null && strBatchSize.trim().length() > 0) { - try { - batchSize = Integer.parseInt(strBatchSize); - if (batchSize < 1) { - logger.warn("A batch-size less than 1 was specified: " + batchSize - + ". Using default instead."); - batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE; - } - } catch (NumberFormatException ex) { - logger.warn("Invalid batch size specified: " + strBatchSize - + ". Using default instead."); - } - - } isActive = true; } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java index d3ccf74..a025612 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java @@ -185,6 +185,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { } selector.setHosts(hosts); + batchSize = parseBatchSize(properties); isOpen = true; } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java index 21a9553..b61eb79 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java @@ -492,24 +492,7 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient { stateLock.unlock(); } - // batch size - String strBatchSize = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_BATCH_SIZE); - logger.debug("Batch size string = " + strBatchSize); - batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE; - if (strBatchSize != null && !strBatchSize.isEmpty()) { - try { - int parsedBatch = Integer.parseInt(strBatchSize); - if (parsedBatch < 1) { - logger.warn("Invalid value for batchSize: {}; Using default value.", parsedBatch); - } else { - batchSize = parsedBatch; - } - } catch (NumberFormatException e) { - logger.warn("Batchsize is not valid for RpcClient: " + strBatchSize + - ". Default value assigned.", e); - } - } + batchSize = parseBatchSize(properties); // host and port String hostNames = properties.getProperty( http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java index 1d21d5f..07d9c90 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java @@ -72,7 +72,6 @@ public class ThriftRpcClient extends AbstractRpcClient { public static final String BINARY_PROTOCOL = "binary"; public static final String COMPACT_PROTOCOL = "compact"; - private int batchSize; private long requestTimeout; private final Lock stateLock; private State connState; @@ -107,12 +106,6 @@ public class ThriftRpcClient extends AbstractRpcClient { }); } - - @Override - public int getBatchSize() { - return batchSize; - } - @Override public void append(Event event) throws EventDeliveryException { // Thrift IPC client is not thread safe, so don't allow state changes or @@ -299,9 +292,7 @@ public class ThriftRpcClient extends AbstractRpcClient { + "choose from. Defaulting to 'compact'."); protocol = COMPACT_PROTOCOL; } - batchSize = Integer.parseInt(properties.getProperty( - RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, - RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString())); + batchSize = parseBatchSize(properties); requestTimeout = Long.parseLong(properties.getProperty( RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, String.valueOf( http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index fa31262..4a44264 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -19,6 +19,7 @@ package org.apache.flume.sink.kite; import org.apache.flume.auth.FlumeAuthenticationUtil; import org.apache.flume.auth.PrivilegedExecutor; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.sink.kite.parser.EntityParserFactory; import org.apache.flume.sink.kite.parser.EntityParser; import org.apache.flume.sink.kite.policy.FailurePolicy; @@ -69,7 +70,7 @@ import org.kitesdk.data.Formats; * and loading a Dataset by name, {@code kite.dataset.name}, and namespace, * {@code kite.dataset.namespace}. */ -public class DatasetSink extends AbstractSink implements Configurable { +public class DatasetSink extends AbstractSink implements Configurable, BatchSizeSupported { private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class); @@ -579,4 +580,9 @@ public class DatasetSink extends AbstractSink implements Configurable { return Registration.lookupDatasetUri(URI.create( uri.getRawSchemeSpecificPart())).second().get("dataset"); } + + @Override + public long getBatchSize() { + return batchSize; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 22306a0..8189ca8 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -43,6 +43,7 @@ import org.apache.flume.SystemClock; import org.apache.flume.Transaction; import org.apache.flume.auth.FlumeAuthenticationUtil; import org.apache.flume.auth.PrivilegedExecutor; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.instrumentation.SinkCounter; @@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; -public class HDFSEventSink extends AbstractSink implements Configurable { +public class HDFSEventSink extends AbstractSink implements Configurable, BatchSizeSupported { public interface WriterCallback { public void run(String filePath); } @@ -559,4 +560,10 @@ public class HDFSEventSink extends AbstractSink implements Configurable { int getTryCount() { return tryCount; } + + @Override + public long getBatchSize() { + return batchSize; + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java index 8db008e..ec1fe62 100644 --- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java @@ -28,6 +28,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.instrumentation.SinkCounter; @@ -50,7 +51,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -public class HiveSink extends AbstractSink implements Configurable { +public class HiveSink extends AbstractSink implements Configurable, BatchSizeSupported { private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class); @@ -513,6 +514,10 @@ public class HiveSink extends AbstractSink implements Configurable { } } + @Override + public long getBatchSize() { + return batchSize; + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java index ebafb9f..05eb5ff 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java @@ -38,6 +38,7 @@ import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; @@ -83,7 +84,7 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.IND * ://www.elasticsearch.org/guide/reference/api/admin-indices-templates. * html */ -public class ElasticSearchSink extends AbstractSink implements Configurable { +public class ElasticSearchSink extends AbstractSink implements Configurable, BatchSizeSupported { private static final Logger logger = LoggerFactory .getLogger(ElasticSearchSink.class); @@ -172,6 +173,11 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { } @Override + public long getBatchSize() { + return batchSize; + } + + @Override public Status process() throws EventDeliveryException { logger.debug("processing..."); Status status = Status.READY; http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 881f661..bd0efa9 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -33,6 +33,7 @@ import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.Transaction; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.instrumentation.SinkCounter; @@ -101,7 +102,7 @@ import java.util.concurrent.locks.ReentrantLock; * multiple increments are returned by the serializer, then HBase failure * will cause them to be re-written, when HBase comes back up. */ -public class AsyncHBaseSink extends AbstractSink implements Configurable { +public class AsyncHBaseSink extends AbstractSink implements Configurable, BatchSizeSupported { private String tableName; private byte[] columnFamily; @@ -452,6 +453,11 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } @Override + public long getBatchSize() { + return batchSize; + } + + @Override public void start() { Preconditions.checkArgument(client == null, "Please call stop " + "before calling start on an old instance."); http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index 29969ad..9b9bce9 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -33,6 +33,7 @@ import org.apache.flume.Transaction; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.auth.FlumeAuthenticationUtil; import org.apache.flume.auth.PrivilegedExecutor; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.instrumentation.SinkCounter; @@ -90,7 +91,7 @@ import java.util.NavigableMap; * multiple increments are returned by the serializer, then HBase failure * will cause them to be re-written, when HBase comes back up. */ -public class HBaseSink extends AbstractSink implements Configurable { +public class HBaseSink extends AbstractSink implements Configurable, BatchSizeSupported { private String tableName; private byte[] columnFamily; private HTable table; @@ -555,6 +556,11 @@ public class HBaseSink extends AbstractSink implements Configurable { return serializer; } + @Override + public long getBatchSize() { + return batchSize; + } + @VisibleForTesting @InterfaceAudience.Private interface DebugIncrementsCallback { http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java index 1c6e285..42db0be 100644 --- a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java +++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java @@ -33,6 +33,7 @@ import org.apache.flume.Transaction; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.auth.FlumeAuthenticationUtil; import org.apache.flume.auth.PrivilegedExecutor; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.instrumentation.SinkCounter; @@ -96,7 +97,7 @@ import java.util.NavigableMap; * multiple increments are returned by the serializer, then HBase failure * will cause them to be re-written, when HBase comes back up. */ -public class HBase2Sink extends AbstractSink implements Configurable { +public class HBase2Sink extends AbstractSink implements Configurable, BatchSizeSupported { private String tableName; private byte[] columnFamily; private Connection conn; @@ -541,6 +542,11 @@ public class HBase2Sink extends AbstractSink implements Configurable { return serializer; } + @Override + public long getBatchSize() { + return batchSize; + } + @VisibleForTesting @InterfaceAudience.Private interface DebugIncrementsCallback { http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index 7f347d8..eaabd6e 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -29,6 +29,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.conf.LogPrivacyUtil; @@ -102,7 +103,7 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_ * topic * key */ -public class KafkaSink extends AbstractSink implements Configurable { +public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSupported { private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class); @@ -136,7 +137,7 @@ public class KafkaSink extends AbstractSink implements Configurable { return topic; } - public int getBatchSize() { + public long getBatchSize() { return batchSize; } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java index 7d9f807..6cacda0 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java @@ -22,6 +22,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.conf.LogPrivacyUtil; @@ -36,7 +37,7 @@ import org.kitesdk.morphline.api.Command; * Flume sink that extracts search documents from Flume events and processes them using a morphline * {@link Command} chain. */ -public class MorphlineSink extends AbstractSink implements Configurable { +public class MorphlineSink extends AbstractSink implements Configurable, BatchSizeSupported { private int maxBatchSize = 1000; private long maxBatchDurationMillis = 1000; @@ -194,7 +195,12 @@ public class MorphlineSink extends AbstractSink implements Configurable { txn.close(); } } - + + @Override + public long getBatchSize() { + return getMaxBatchSize(); + } + @Override public String toString() { int i = getClass().getName().lastIndexOf('.') + 1; http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java index 5dd82c9..938d5e2 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java @@ -36,6 +36,7 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurables; import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.source.AbstractPollableSource; @@ -50,7 +51,7 @@ import com.google.common.io.Files; @InterfaceAudience.Private @InterfaceStability.Unstable -public class JMSSource extends AbstractPollableSource { +public class JMSSource extends AbstractPollableSource implements BatchSizeSupported { private static final Logger logger = LoggerFactory.getLogger(JMSSource.class); // setup by constructor @@ -358,4 +359,9 @@ public class JMSSource extends AbstractPollableSource { jmsExceptionCounter = 0; return consumer; } + + @Override + public long getBatchSize() { + return batchSize; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 8053b41..da4ec1a 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -41,6 +41,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.conf.LogPrivacyUtil; @@ -94,7 +95,7 @@ import static scala.collection.JavaConverters.asJavaListConverter; * <p> */ public class KafkaSource extends AbstractPollableSource - implements Configurable { + implements Configurable, BatchSizeSupported { private static final Logger log = LoggerFactory.getLogger(KafkaSource.class); // Constants used only for offset migration zookeeper connections @@ -130,6 +131,11 @@ public class KafkaSource extends AbstractPollableSource private String topicHeader = null; private boolean setTopicHeader; + @Override + public long getBatchSize() { + return batchUpperLimit; + } + /** * This class is a helper to subscribe for topics by using * different strategies http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index 0c656d6..e121a2b 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -39,6 +39,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.PollableSource; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.source.AbstractSource; @@ -57,7 +58,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.Gson; public class TaildirSource extends AbstractSource implements - PollableSource, Configurable { + PollableSource, Configurable, BatchSizeSupported { private static final Logger logger = LoggerFactory.getLogger(TaildirSource.class); @@ -190,6 +191,11 @@ public class TaildirSource extends AbstractSource implements } } + @Override + public long getBatchSize() { + return batchSize; + } + private Map<String, String> selectByKeys(Map<String, String> map, String[] keys) { Map<String, String> result = Maps.newHashMap(); for (String key : keys) { http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java index d812023..948e6c0 100644 --- a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java +++ b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java @@ -39,6 +39,7 @@ import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; @@ -68,7 +69,7 @@ import twitter4j.auth.AccessToken; @InterfaceStability.Unstable public class TwitterSource extends AbstractSource - implements EventDrivenSource, Configurable, StatusListener { + implements EventDrivenSource, Configurable, StatusListener, BatchSizeSupported { private TwitterStream twitterStream; private Schema avroSchema; @@ -325,4 +326,9 @@ public class TwitterSource public void onException(Exception e) { LOGGER.error("Exception while streaming tweets", e); } + + @Override + public long getBatchSize() { + return maxBatchSize; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfig.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfig.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfig.java new file mode 100644 index 0000000..57e720c --- /dev/null +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfig.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.test.agent; + +import org.apache.commons.io.FileUtils; +import org.apache.flume.test.util.StagedInstall; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.alias.CredentialShell; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Scanner; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestConfig { + private static final Logger LOGGER = + LoggerFactory.getLogger(TestConfig.class); + + @ClassRule + public static final EnvironmentVariables environmentVariables + = new EnvironmentVariables(); + + private Properties agentProps; + private Map<String, String> agentEnv; + private File sinkOutputDir1; + private File sinkOutputDir2; + private File sinkOutputDir3; + private File hadoopCredStore; + + @Before + public void setup() throws Exception { + + File agentDir = StagedInstall.getInstance().getStageDir(); + LOGGER.debug("Using agent stage dir: {}", agentDir); + + File testDir = new File(agentDir, TestConfig.class.getName()); + if (testDir.exists()) { + FileUtils.deleteDirectory(testDir); + } + assertTrue(testDir.mkdirs()); + + agentProps = new Properties(); + agentEnv = new HashMap<>(); + + // Create the rest of the properties file + agentProps.put("agent.sources.seq-01.type", "seq"); + agentProps.put("agent.sources.seq-01.totalEvents", "100"); + agentProps.put("agent.sources.seq-01.channels", "mem-01 mem-02 mem-03"); + agentProps.put("agent.channels.mem-01.type", "MEMORY"); + agentProps.put("agent.channels.mem-01.capacity", String.valueOf(100000)); + agentProps.put("agent.channels.mem-02.type", "MEMORY"); + agentProps.put("agent.channels.mem-02.capacity", String.valueOf(100000)); + agentProps.put("agent.channels.mem-03.type", "MEMORY"); + agentProps.put("agent.channels.mem-04.capacity", String.valueOf(100000)); + + sinkOutputDir1 = new File(testDir, "out1"); + assertTrue("Unable to create sink output dir: " + sinkOutputDir1.getPath(), + sinkOutputDir1.mkdir()); + sinkOutputDir2 = new File(testDir, "out2"); + assertTrue("Unable to create sink output dir: " + sinkOutputDir2.getPath(), + sinkOutputDir2.mkdir()); + sinkOutputDir3 = new File(testDir, "out3"); + assertTrue("Unable to create sink output dir: " + sinkOutputDir3.getPath(), + sinkOutputDir3.mkdir()); + + environmentVariables.set("HADOOP_CREDSTORE_PASSWORD", "envSecret"); + + agentEnv.put("dirname_env", sinkOutputDir1.getAbsolutePath()); + agentEnv.put("HADOOP_CREDSTORE_PASSWORD", "envSecret"); + + hadoopCredStore = new File(testDir, "credstore.jceks"); + String providerPath = "jceks://file/" + hadoopCredStore.getAbsolutePath(); + + ToolRunner.run( + new Configuration(), new CredentialShell(), + ("create dirname_hadoop -value " + sinkOutputDir3.getAbsolutePath() + + " -provider " + providerPath).split(" ")); + + + agentProps.put("agent.sinks.roll-01.channel", "mem-01"); + agentProps.put("agent.sinks.roll-01.type", "FILE_ROLL"); + agentProps.put("agent.sinks.roll-01.sink.directory", "${filter-01[\"dirname_env\"]}"); + agentProps.put("agent.sinks.roll-01.sink.rollInterval", "0"); + agentProps.put("agent.sinks.roll-02.channel", "mem-02"); + agentProps.put("agent.sinks.roll-02.type", "FILE_ROLL"); + agentProps.put("agent.sinks.roll-02.sink.directory", + sinkOutputDir2.getParentFile().getAbsolutePath() + "/${filter-02['out2']}"); + agentProps.put("agent.sinks.roll-02.sink.rollInterval", "0"); + agentProps.put("agent.sinks.roll-03.channel", "mem-03"); + agentProps.put("agent.sinks.roll-03.type", "FILE_ROLL"); + agentProps.put("agent.sinks.roll-03.sink.directory", "${filter-03[dirname_hadoop]}"); + agentProps.put("agent.sinks.roll-03.sink.rollInterval", "0"); + + agentProps.put("agent.configfilters.filter-01.type", "env"); + agentProps.put("agent.configfilters.filter-02.type", "external"); + agentProps.put("agent.configfilters.filter-02.command", "echo"); + agentProps.put("agent.configfilters.filter-03.type", "hadoop"); + agentProps.put("agent.configfilters.filter-03.credential.provider.path", providerPath); + + agentProps.put("agent.sources", "seq-01"); + agentProps.put("agent.channels", "mem-01 mem-02 mem-03"); + agentProps.put("agent.sinks", "roll-01 roll-02 roll-03"); + agentProps.put("agent.configfilters", "filter-01 filter-02 filter-03"); + } + + @After + public void teardown() throws Exception { + StagedInstall.getInstance().stopAgent(); + } + + private void validateSeenEvents(File outDir, int outFiles, int events) + throws IOException { + File[] sinkOutputDirChildren = outDir.listFiles(); + assertEquals("Unexpected number of files in output dir", + outFiles, sinkOutputDirChildren.length); + Set<String> seenEvents = new HashSet<>(); + for (File outFile : sinkOutputDirChildren) { + Scanner scanner = new Scanner(outFile); + while (scanner.hasNext()) { + seenEvents.add(scanner.nextLine()); + } + } + for (int event = 0; event < events; event++) { + assertTrue( + "Missing event: {" + event + "}", + seenEvents.contains(String.valueOf(event)) + ); + } + } + + @Test + public void testConfigReplacement() throws Exception { + LOGGER.debug("testConfigReplacement() started."); + + StagedInstall.getInstance().startAgent("agent", agentProps, agentEnv); + + TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files + + // Ensure we received all events. + validateSeenEvents(sinkOutputDir1, 1, 100); + validateSeenEvents(sinkOutputDir2, 1, 100); + validateSeenEvents(sinkOutputDir3, 1, 100); + LOGGER.debug("Processed all the events!"); + + LOGGER.debug("testConfigReplacement() ended."); + } + + @Test + public void testConfigReload() throws Exception { + LOGGER.debug("testConfigReplacement() started."); + + agentProps.put("agent.channels.mem-01.transactionCapacity", "10"); + agentProps.put("agent.sinks.roll-01.sink.batchSize", "20"); + StagedInstall.getInstance().startAgent("agent", agentProps, agentEnv); + + TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files + + // This directory is empty due to misconfiguration + validateSeenEvents(sinkOutputDir1, 0, 0); + + // These are well configured + validateSeenEvents(sinkOutputDir2, 1, 100); + validateSeenEvents(sinkOutputDir3, 1, 100); + LOGGER.debug("Processed all the events!"); + + //repair the config + agentProps.put("agent.channels.mem-01.transactionCapacity", "20"); + StagedInstall.getInstance().reconfigure(agentProps); + + TimeUnit.SECONDS.sleep(40); // Wait for sources and sink to process files + // Ensure we received all events. + validateSeenEvents(sinkOutputDir1, 1, 100); + + LOGGER.debug("testConfigReplacement() ended."); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/dad828ac/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java deleted file mode 100644 index b82c4b6..0000000 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.test.agent; - -import org.apache.flume.test.util.StagedInstall; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.alias.CredentialShell; -import org.apache.hadoop.util.ToolRunner; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.contrib.java.lang.system.EnvironmentVariables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Properties; -import java.util.Scanner; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestConfigFilters { - private static final Logger LOGGER = - LoggerFactory.getLogger(TestConfigFilters.class); - - @ClassRule - public static final EnvironmentVariables environmentVariables - = new EnvironmentVariables(); - - private Properties agentProps; - private Map<String, String> agentEnv; - private File sinkOutputDir1; - private File sinkOutputDir2; - private File sinkOutputDir3; - private File hadoopCredStore; - - @Before - public void setup() throws Exception { - - File agentDir = StagedInstall.getInstance().getStageDir(); - LOGGER.debug("Using agent stage dir: {}", agentDir); - - File testDir = new File(agentDir, TestConfigFilters.class.getName()); - assertTrue(testDir.mkdirs()); - - agentProps = new Properties(); - agentEnv = new HashMap<>(); - - // Create the rest of the properties file - agentProps.put("agent.sources.seq-01.type", "seq"); - agentProps.put("agent.sources.seq-01.totalEvents", "100"); - agentProps.put("agent.sources.seq-01.channels", "mem-01 mem-02 mem-03"); - agentProps.put("agent.channels.mem-01.type", "MEMORY"); - agentProps.put("agent.channels.mem-01.capacity", String.valueOf(100000)); - agentProps.put("agent.channels.mem-02.type", "MEMORY"); - agentProps.put("agent.channels.mem-02.capacity", String.valueOf(100000)); - agentProps.put("agent.channels.mem-03.type", "MEMORY"); - agentProps.put("agent.channels.mem-04.capacity", String.valueOf(100000)); - - sinkOutputDir1 = new File(testDir, "out1"); - assertTrue("Unable to create sink output dir: " + sinkOutputDir1.getPath(), - sinkOutputDir1.mkdir()); - sinkOutputDir2 = new File(testDir, "out2"); - assertTrue("Unable to create sink output dir: " + sinkOutputDir2.getPath(), - sinkOutputDir2.mkdir()); - sinkOutputDir3 = new File(testDir, "out3"); - assertTrue("Unable to create sink output dir: " + sinkOutputDir3.getPath(), - sinkOutputDir3.mkdir()); - - environmentVariables.set("HADOOP_CREDSTORE_PASSWORD", "envSecret"); - - agentEnv.put("dirname_env", sinkOutputDir1.getAbsolutePath()); - agentEnv.put("HADOOP_CREDSTORE_PASSWORD", "envSecret"); - - hadoopCredStore = new File(testDir, "credstore.jceks"); - String providerPath = "jceks://file/" + hadoopCredStore.getAbsolutePath(); - - ToolRunner.run( - new Configuration(), new CredentialShell(), - ("create dirname_hadoop -value " + sinkOutputDir3.getAbsolutePath() - + " -provider " + providerPath).split(" ")); - - - agentProps.put("agent.sinks.roll-01.channel", "mem-01"); - agentProps.put("agent.sinks.roll-01.type", "FILE_ROLL"); - agentProps.put("agent.sinks.roll-01.sink.directory", "${filter-01[\"dirname_env\"]}"); - agentProps.put("agent.sinks.roll-01.sink.rollInterval", "0"); - agentProps.put("agent.sinks.roll-02.channel", "mem-02"); - agentProps.put("agent.sinks.roll-02.type", "FILE_ROLL"); - agentProps.put("agent.sinks.roll-02.sink.directory", - sinkOutputDir2.getParentFile().getAbsolutePath() + "/${filter-02['out2']}"); - agentProps.put("agent.sinks.roll-02.sink.rollInterval", "0"); - agentProps.put("agent.sinks.roll-03.channel", "mem-03"); - agentProps.put("agent.sinks.roll-03.type", "FILE_ROLL"); - agentProps.put("agent.sinks.roll-03.sink.directory", "${filter-03[dirname_hadoop]}"); - agentProps.put("agent.sinks.roll-03.sink.rollInterval", "0"); - - agentProps.put("agent.configfilters.filter-01.type", "env"); - agentProps.put("agent.configfilters.filter-02.type", "external"); - agentProps.put("agent.configfilters.filter-02.command", "echo"); - agentProps.put("agent.configfilters.filter-03.type", "hadoop"); - agentProps.put("agent.configfilters.filter-03.credential.provider.path", providerPath); - - agentProps.put("agent.sources", "seq-01"); - agentProps.put("agent.channels", "mem-01 mem-02 mem-03"); - agentProps.put("agent.sinks", "roll-01 roll-02 roll-03"); - agentProps.put("agent.configfilters", "filter-01 filter-02 filter-03"); - } - - @After - public void teardown() throws Exception { - StagedInstall.getInstance().stopAgent(); - } - - private void validateSeenEvents(File outDir, int outFiles, int events) - throws IOException { - File[] sinkOutputDirChildren = outDir.listFiles(); - assertEquals("Unexpected number of files in output dir", - outFiles, sinkOutputDirChildren.length); - Set<String> seenEvents = new HashSet<>(); - for (File outFile : sinkOutputDirChildren) { - Scanner scanner = new Scanner(outFile); - while (scanner.hasNext()) { - seenEvents.add(scanner.nextLine()); - } - } - for (int event = 0; event < events; event++) { - assertTrue( - "Missing event: {" + event + "}", - seenEvents.contains(String.valueOf(event)) - ); - } - } - - @Test - public void testConfigReplacement() throws Exception { - LOGGER.debug("testConfigReplacement() started."); - - StagedInstall.getInstance().startAgent("agent", agentProps, agentEnv); - - TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files - - // Ensure we received all events. - validateSeenEvents(sinkOutputDir1, 1, 100); - validateSeenEvents(sinkOutputDir2, 1, 100); - validateSeenEvents(sinkOutputDir3, 1, 100); - LOGGER.debug("Processed all the events!"); - - LOGGER.debug("testConfigReplacement() ended."); - } - -}
