Add and test ApplicationTests & complete README
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c48ec8c5 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c48ec8c5 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c48ec8c5 Branch: refs/heads/master Commit: c48ec8c51f91c7cd3e24d1fed62cd7072af57865 Parents: d200737 Author: Oliver Winke <[email protected]> Authored: Tue Mar 14 12:12:07 2017 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Mon May 22 16:47:34 2017 -0700 ---------------------------------------------------------------------- flume/README.md | 73 +++++++++ flume/pom.xml | 18 +++ .../ColumnFilteringFormattingInterceptor.java | 2 +- .../operator/AbstractFlumeInputOperator.java | 8 +- .../apex/malhar/flume/sink/DTFlumeSink.java | 10 +- .../apache/apex/malhar/flume/sink/Server.java | 3 +- .../apex/malhar/flume/storage/HDFSStorage.java | 3 +- .../discovery/ZKAssistedDiscoveryTest.java | 4 +- .../integration/ApplicationDiscoveryTest.java | 151 +++++++++++++++++++ .../flume/integration/ApplicationTest.java | 26 +++- .../apex/malhar/flume/sink/DTFlumeSinkTest.java | 3 +- .../resources/flume/conf/flume-conf.properties | 4 +- .../test/resources/flume/conf/flume_simple.conf | 52 +++++++ .../resources/flume/conf/flume_zkdiscovery.conf | 91 +++++++++++ 14 files changed, 425 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/README.md ---------------------------------------------------------------------- diff --git a/flume/README.md b/flume/README.md index 1d0b2d9..ec8fae9 100644 --- a/flume/README.md +++ b/flume/README.md @@ -4,3 +4,76 @@ Flume The folder contains support for flume to be used with Apex. It comprises mainly of two components. First is an agent that sits on the flume side, receives data from flume and makes it available via a socket server. In effect it converts a push to a pull model. The second component is the input operator that reads from the agent. The project is started with the latest code at the time of the sub-module creation. For older history look at the flume sub-module in the older project called Megh ([email protected]:DataTorrent/Megh). + + +## Setup flume agent: + +To set up the flume agent for Apex input operator, flumes plugin-based +architecture is used. + +Set up flume and make sure JAVA_HOME is set. + +Build malhar-flume `mvn clean package -DskipTests`. +The plugin `malhar-flume-ver.jar` and all necessary dependencies `target/deps` can now be found in the target directory. +To add the plugin to your flume service create a plugins.d directories in FLUME_HOME. + +Put the malhar-flume-ver.jar in `plugins.d/custom-plugin-name/lib/` +and all the needed dependencies into `plugins.d/custom-plugin-name/libext/` + +(Alternatively to flume's automatic plugins.d detection, jars can be added to the +FLUME_CLASSPATH using a `flume-env.sh` script. (See 'resources/flume-conf/flume-env.sample.sh') +Therefore a maven repository must be available under $HOME/.m2 and the environment variable +DT_FLUME_JAR must point to the plugin JAR.) + +***Flume configuration*** +A basic flume configuration can be found in `src/test/resources/flume/conf/flume_simple.conf`. +A flume configuration using discovery service can be found in `src/test/resources/flume/conf/flume_zkdiscovery.conf`. + Configuration files should be placed in flumes 'conf' directory and will be explicitly selected + when running flume-ng + +In the configuration file set `org.apache.apex.malhar.flume.sink.DTFlumeSink` for the **type** +and `org.apache.apex.malhar.flume.storage.HDFSStorage` for the **storage**, +as well as a **HDFS directory** for `baseDir`. The HDFS base directory needs +to be created on HDFS. + +For discovery set `org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery` for each sink +and configure them to use the zookeeper service by adding the zookeeper address in `connectionString` as well as a `basePath`. +These values also need to be set for **ZKListener** in the apex application. + +### Operator Usage + +An implementation of AbstractFlumeInputOperator can either simply connect +to one flume sink or use discovery/zookeeper to detect flume sinks automatically +and partition the operator accordingly at the beginning. + +Implement abstract method to convert the Flume event to tuple: +```java +public abstract T convert(Event event); +``` + +Additionally a StreamCodec for Flume events must be set. A codec implementation + can be found in storage/EventCodec.java +```java +setCodec(new EventCodec()); +``` + +See `ApplicationDiscoveryTest.FlumeInputOperator` for an example operator implementation +##### Simple connection setup to one flume sink: +For a simple connection to only one flume sink set the connection address in the form of `sinkid:host:port`: +```java +public void setConnectAddresses(String[] specs) +``` + + +##### Setup using discovery/zookeeper: +For a flume input operator to discover flume sinks and partition accordingly +a zookeeper service needs to be set up. + +An implementation of AbstractFlumeInputOperator needs to initialize a ZKStatsListener. +It additionally needs to override **definePartitions** to setup ZKStatsListener, discover addresses using discover() +and set them in discoveredFlumeSinks before calling the parents definePartitions method. + + +See `src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java` +and `src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java` +for test implementations. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/pom.xml ---------------------------------------------------------------------- diff --git a/flume/pom.xml b/flume/pom.xml index 735a13b..851697e 100644 --- a/flume/pom.xml +++ b/flume/pom.xml @@ -175,6 +175,24 @@ <argLine>-Xmx5000M</argLine> </configuration> </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>prepare-package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>target/deps</outputDirectory> + <includeScope>runtime</includeScope> + <excludeGroupIds>org.apache.hadoop</excludeGroupIds> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> <dependencies> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java index bd7e5e0..11ec3ef 100644 --- a/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java +++ b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java @@ -212,7 +212,7 @@ public class ColumnFilteringFormattingInterceptor implements Interceptor dstSeparators[i] = emptyStringBytes; } } - srcSeparator = context.getInteger(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, (int) ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT).byteValue(); + srcSeparator = context.getInteger(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, (int)ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT).byteValue(); this.prefix = lPrefix.getBytes(); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java index da1a8aa..f9beb71 100644 --- a/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java +++ b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java @@ -33,11 +33,12 @@ import java.util.concurrent.ArrayBlockingQueue; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.apex.malhar.flume.discovery.Discovery; +import org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery; import org.apache.apex.malhar.flume.sink.Server; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.flume.Event; @@ -50,7 +51,6 @@ import com.datatorrent.api.Operator; import com.datatorrent.api.Partitioner; import com.datatorrent.api.Stats.OperatorStats; import com.datatorrent.api.StreamCodec; -import org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery; import com.datatorrent.netlet.AbstractLengthPrependerClient; import com.datatorrent.netlet.DefaultEventLoop; import com.datatorrent.netlet.util.Slice; @@ -715,7 +715,7 @@ public abstract class AbstractFlumeInputOperator<T> } }; - private static final transient ThreadLocal<Collection<Discovery.Service<byte[]>>> discoveredFlumeSinks = + protected static final transient ThreadLocal<Collection<Discovery.Service<byte[]>>> discoveredFlumeSinks = new ThreadLocal<Collection<Discovery.Service<byte[]>>>(); @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java index 306ce13..4f28850 100644 --- a/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java +++ b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java @@ -25,11 +25,14 @@ import java.util.Collection; import java.util.Collections; import java.util.ServiceConfigurationError; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.apex.malhar.flume.discovery.Discovery; +import org.apache.apex.malhar.flume.sink.Server.Client; +import org.apache.apex.malhar.flume.sink.Server.Request; import org.apache.apex.malhar.flume.storage.EventCodec; import org.apache.apex.malhar.flume.storage.Storage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.flume.Context; import org.apache.flume.Event; @@ -40,8 +43,7 @@ import org.apache.flume.sink.AbstractSink; import com.datatorrent.api.Component; import com.datatorrent.api.StreamCodec; -import org.apache.apex.malhar.flume.sink.Server.Client; -import org.apache.apex.malhar.flume.sink.Server.Request; + import com.datatorrent.netlet.DefaultEventLoop; import com.datatorrent.netlet.NetletThrowable; import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java b/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java index a771cb3..c8a8440 100644 --- a/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java +++ b/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java @@ -25,10 +25,11 @@ import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Arrays; -import org.apache.apex.malhar.flume.discovery.Discovery; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.flume.discovery.Discovery; + import com.datatorrent.netlet.AbstractLengthPrependerClient; import com.datatorrent.netlet.AbstractServer; import com.datatorrent.netlet.EventLoop; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java index 77aeb68..54716b7 100644 --- a/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java +++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java @@ -32,6 +32,8 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.flume.sink.Server; + import org.apache.flume.Context; import org.apache.flume.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -45,7 +47,6 @@ import com.google.common.primitives.Longs; import com.datatorrent.api.Component; import com.datatorrent.common.util.NameableThreadFactory; -import org.apache.apex.malhar.flume.sink.Server; import com.datatorrent.netlet.util.Slice; /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java index 6503357..9db5d32 100644 --- a/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java +++ b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java @@ -24,11 +24,11 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.flume.discovery.Discovery.Service; + import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.details.InstanceSerializer; -import com.datatorrent.flume.discovery.Discovery.Service; - import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertNotNull; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java new file mode 100644 index 0000000..5486469 --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.integration; + +import java.util.Collection; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.flume.discovery.Discovery; +import org.apache.apex.malhar.flume.operator.AbstractFlumeInputOperator; +import org.apache.apex.malhar.flume.storage.EventCodec; +import org.apache.flume.Event; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamingApplication; + +/** + * baseDir needs to be created in HDFS + * Local zookeeper service needs to be running on default 127.0.0.1:2181 + * Local flume service needs to be running using src/test/resources/flume/conf/flume_zkdiscovery.conf configuration + */ +@Ignore +public class ApplicationDiscoveryTest implements StreamingApplication +{ + static int globalCount; + + public static class FlumeInputOperator extends AbstractFlumeInputOperator<Event> + { + public ZKStatsListner zkListener = new AbstractFlumeInputOperator.ZKStatsListner(); + private boolean first = true; + + + @Override + public Event convert(Event event) + { + return event; + } + + + @Override + public Collection<Partition<AbstractFlumeInputOperator<Event>>> definePartitions(Collection<Partition<AbstractFlumeInputOperator<Event>>> partitions, PartitioningContext context) + { + if (first) { + first = false; + zkListener.setup(null); + } + Collection<Discovery.Service<byte[]>> addresses; + addresses = zkListener.discover(); + discoveredFlumeSinks.set(addresses); + + return super.definePartitions(partitions, context); + } + } + + public static class Counter implements Operator + { + private int count; + private transient Event event; + public final transient DefaultInputPort<Event> input = new DefaultInputPort<Event>() + { + @Override + public void process(Event tuple) + { + count++; + event = tuple; + } + + }; + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + if (event != null) { + logger.info("total count = {}, tuple = {}", count, new String(event.getBody())); + } else { + logger.info("total count = {}, tuple = {}", count, event); + } + globalCount = count; + } + + @Override + public void setup(OperatorContext context) + { + } + + @Override + public void teardown() + { + } + + private static final Logger logger = LoggerFactory.getLogger(Counter.class); + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 1000); + FlumeInputOperator flume = dag.addOperator("FlumeOperator", new FlumeInputOperator()); + flume.setCodec(new EventCodec()); + flume.zkListener.setConnectionString("127.0.0.1:2181"); + flume.zkListener.setBasePath("/flume/basepath"); + Counter counter = dag.addOperator("Counter", new Counter()); + + dag.addStream("Slices", flume.output, counter.input).setLocality(Locality.CONTAINER_LOCAL); + } + + @Test + public void test() + { + try { + LocalMode.runApp(this, 10000); + } catch (Exception ex) { + logger.warn("The dag seems to be not testable yet, if it's - remove this exception handling", ex); + } + //flume source sequence generator is set to 10 in flume configuration going to two source -> 20 + Assert.assertEquals(20, globalCount); + } + + private static final Logger logger = LoggerFactory.getLogger(ApplicationDiscoveryTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java index 10153bc..67c911c 100644 --- a/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java +++ b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java @@ -18,11 +18,15 @@ */ package org.apache.apex.malhar.flume.integration; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.flume.operator.AbstractFlumeInputOperator; +import org.apache.apex.malhar.flume.storage.EventCodec; + import org.apache.flume.Event; import org.apache.hadoop.conf.Configuration; @@ -33,15 +37,17 @@ import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.LocalMode; import com.datatorrent.api.Operator; import com.datatorrent.api.StreamingApplication; -import com.datatorrent.flume.operator.AbstractFlumeInputOperator; -import com.datatorrent.flume.storage.EventCodec; + /** - * + * baseDir needs to be created in HDFS + * Local zookeeper service needs to be running on default 127.0.0.1:2181 + * Local flume service needs to be running using src/test/resources/flume/conf/flume_simple.conf configuration */ @Ignore public class ApplicationTest implements StreamingApplication { + static int globalCount; public static class FlumeInputOperator extends AbstractFlumeInputOperator<Event> { @Override @@ -74,7 +80,12 @@ public class ApplicationTest implements StreamingApplication @Override public void endWindow() { - logger.debug("total count = {}, tuple = {}", count, event); + if (event != null) { + logger.info("total count = {}, tuple = {}", count, new String(event.getBody())); + } else { + logger.info("total count = {}, tuple = {}", count, event); + } + globalCount = count; } @Override @@ -95,7 +106,7 @@ public class ApplicationTest implements StreamingApplication { dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 1000); FlumeInputOperator flume = dag.addOperator("FlumeOperator", new FlumeInputOperator()); - flume.setConnectAddresses(new String[]{"test:127.0.0.1:8080"}); + flume.setConnectAddresses(new String[]{"sink1:127.0.0.1:9098"}); flume.setCodec(new EventCodec()); Counter counter = dag.addOperator("Counter", new Counter()); @@ -106,11 +117,12 @@ public class ApplicationTest implements StreamingApplication public void test() { try { - LocalMode.runApp(this, Integer.MAX_VALUE); + LocalMode.runApp(this, 10000); } catch (Exception ex) { logger.warn("The dag seems to be not testable yet, if it's - remove this exception handling", ex); } - + //flume source sequence generator is set to 10 in flume configuration + Assert.assertEquals(10, globalCount); } private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java index 9bc69e8..f97d9c0 100644 --- a/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java +++ b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java @@ -29,9 +29,10 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.flume.discovery.Discovery; + import org.apache.flume.channel.MemoryChannel; -import com.datatorrent.flume.discovery.Discovery; import com.datatorrent.netlet.AbstractLengthPrependerClient; import com.datatorrent.netlet.DefaultEventLoop; import com.datatorrent.netlet.util.Slice; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/resources/flume/conf/flume-conf.properties ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/flume/conf/flume-conf.properties b/flume/src/test/resources/flume/conf/flume-conf.properties index b796e6d..73dc79a 100644 --- a/flume/src/test/resources/flume/conf/flume-conf.properties +++ b/flume/src/test/resources/flume/conf/flume-conf.properties @@ -33,12 +33,12 @@ agent1.sources.netcatSource.channels = ch1 agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test/resources/test_data/dt_spend 10000 1 # Pick and Reorder the columns we need from a larger record for efficiency agent1.sources.netcatSource.interceptors = columnchooser - agent1.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringInterceptor$Builder + agent1.sources.netcatSource.interceptors.columnchooser.type = org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor$Builder agent1.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2 agent1.sources.netcatSource.interceptors.columnchooser.dstSeparator = 1 agent1.sources.netcatSource.interceptors.columnchooser.columns = 0 43 62 69 68 139 190 70 71 52 75 37 39 42 191 138 - agent2.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor$Builder + agent2.sources.netcatSource.interceptors.columnchooser.type = org.apache.apex.malhar.flume.interceptor.ColumnFilteringFormattingInterceptor$Builder agent2.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2 agent2.sources.netcatSource.interceptors.columnchooser.columnsFormatter = {0}\u0001{43}\u0001{62}\u0001{69}\u0001{68}\u0001{139}\u0001{190}\u0001{70}\u0001{71}\u0001{52}\u0001{75}\u0001{37}\u0001{39}\u0001{42}\u0001{191}\u0001{138}\u0001 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/resources/flume/conf/flume_simple.conf ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/flume/conf/flume_simple.conf b/flume/src/test/resources/flume/conf/flume_simple.conf new file mode 100644 index 0000000..b902881 --- /dev/null +++ b/flume/src/test/resources/flume/conf/flume_simple.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# apex_example.conf: A single-node Flume configuration + +# Name the components on this agent +a1.sources = r1 +a1.sinks = dt +a1.channels = c1 + +# sequence generator source that generates numbers from 0 to 9 +a1.sources.r1.type = seq +a1.sources.r1.totalEvents = 10 + +# sink - dt + a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink + a1.sinks.dt.id = sink1 + a1.sinks.dt.hostname = 127.0.0.1 + a1.sinks.dt.port = 9098 + a1.sinks.dt.sleepMillis = 7 + a1.sinks.dt.throughputAdjustmentFactor = 2 + a1.sinks.dt.maximumEventsPerTransaction = 5000 + a1.sinks.dt.minimumEventsPerTransaction = 1 + a1.sinks.dt.storage = org.apache.apex.malhar.flume.storage.HDFSStorage + a1.sinks.dt.storage.restore = false + a1.sinks.dt.storage.baseDir = /tmp/flume101 + a1.sinks.dt.channel = c1 + +# Use a channel which buffers events in memory +a1.channels.c1.type = memory +a1.channels.c1.capacity = 1000 +a1.channels.c1.transactionCapacity = 100 + +# Bind the source and sink to the channel +a1.sources.r1.channels = c1 + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf b/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf new file mode 100644 index 0000000..6f8932c --- /dev/null +++ b/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# example.conf: A single-node Flume configuration + +# Name the components on this agent +a1.sources = r1 +a1.sinks = dt dt2 +a1.channels = c1 c2 + +# Alternative source for custom inputs +#a1.sources.r1.type = netcat +#a1.sources.r1.bind = 127.0.0.1 +#a1.sources.r1.port = 9097 + +# sequence generator source that generates numbers from 0 to 9 +a1.sources.r1.type = seq +a1.sources.r1.totalEvents = 10 + +# first sink - dt + a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink + a1.sinks.dt.id = sink1 + a1.sinks.dt.hostname = 127.0.0.1 + a1.sinks.dt.port = 9098 + a1.sinks.dt.sleepMillis = 7 + a1.sinks.dt.throughputAdjustmentFactor = 2 + a1.sinks.dt.maximumEventsPerTransaction = 5000 + a1.sinks.dt.minimumEventsPerTransaction = 1 + a1.sinks.dt.storage = org.apache.apex.malhar.flume.storage.HDFSStorage + a1.sinks.dt.storage.restore = false + a1.sinks.dt.storage.baseDir = /tmp/flume101 + a1.sinks.dt.channel = c1 + +# second sink - dt2 + a1.sinks.dt2.type = org.apache.apex.malhar.flume.sink.DTFlumeSink + a1.sinks.dt2.id = sink2 + a1.sinks.dt2.hostname = 127.0.0.1 + a1.sinks.dt2.port = 9099 + a1.sinks.dt2.sleepMillis = 7 + a1.sinks.dt2.throughputAdjustmentFactor = 2 + a1.sinks.dt2.maximumEventsPerTransaction = 5000 + a1.sinks.dt2.minimumEventsPerTransaction = 1 + a1.sinks.dt2.storage = org.apache.apex.malhar.flume.storage.HDFSStorage + a1.sinks.dt2.storage.restore = false + a1.sinks.dt2.storage.baseDir = /tmp/flume101 + a1.sinks.dt2.channel = c2 + +# Use a channel which buffers events in memory + a1.channels.c1.type = memory + a1.channels.c1.capacity = 1000 + a1.channels.c1.transactionCapacity = 100 + +# Ensure that we are able to detect flume sinks (and failures) automatically. + a1.sinks.dt.discovery = org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery + a1.sinks.dt.discovery.connectionString = 127.0.0.1:2181 + a1.sinks.dt.discovery.basePath = /flume/basepath + a1.sinks.dt.discovery.connectionTimeoutMillis = 1000 + a1.sinks.dt.discovery.connectionRetryCount = 10 + a1.sinks.dt.discovery.connectionRetrySleepMillis = 500 + +# Ensure that we are able to detect flume sinks (and failures) automatically. + a1.sinks.dt2.discovery = org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery + a1.sinks.dt2.discovery.connectionString = 127.0.0.1:2181 + a1.sinks.dt2.discovery.basePath = /flume/basepath + a1.sinks.dt2.discovery.connectionTimeoutMillis = 1000 + a1.sinks.dt2.discovery.connectionRetryCount = 10 + a1.sinks.dt2.discovery.connectionRetrySleepMillis = 500 + +# Use a channel which buffers events in memory + a1.channels.c2.type = memory + a1.channels.c2.capacity = 1000 + a1.channels.c2.transactionCapacity = 100 + +# Bind the source and sink to the channel + a1.sources.r1.channels = c1 c2
