Add and test ApplicationTests & complete README

Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c48ec8c5
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c48ec8c5
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c48ec8c5

Branch: refs/heads/master
Commit: c48ec8c51f91c7cd3e24d1fed62cd7072af57865
Parents: d200737
Author: Oliver Winke <[email protected]>
Authored: Tue Mar 14 12:12:07 2017 -0700
Committer: Pramod Immaneni <[email protected]>
Committed: Mon May 22 16:47:34 2017 -0700

----------------------------------------------------------------------
 flume/README.md                                 |  73 +++++++++
 flume/pom.xml                                   |  18 +++
 .../ColumnFilteringFormattingInterceptor.java   |   2 +-
 .../operator/AbstractFlumeInputOperator.java    |   8 +-
 .../apex/malhar/flume/sink/DTFlumeSink.java     |  10 +-
 .../apache/apex/malhar/flume/sink/Server.java   |   3 +-
 .../apex/malhar/flume/storage/HDFSStorage.java  |   3 +-
 .../discovery/ZKAssistedDiscoveryTest.java      |   4 +-
 .../integration/ApplicationDiscoveryTest.java   | 151 +++++++++++++++++++
 .../flume/integration/ApplicationTest.java      |  26 +++-
 .../apex/malhar/flume/sink/DTFlumeSinkTest.java |   3 +-
 .../resources/flume/conf/flume-conf.properties  |   4 +-
 .../test/resources/flume/conf/flume_simple.conf |  52 +++++++
 .../resources/flume/conf/flume_zkdiscovery.conf |  91 +++++++++++
 14 files changed, 425 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/README.md
----------------------------------------------------------------------
diff --git a/flume/README.md b/flume/README.md
index 1d0b2d9..ec8fae9 100644
--- a/flume/README.md
+++ b/flume/README.md
@@ -4,3 +4,76 @@ Flume
 The folder contains support for flume to be used with Apex. It comprises 
mainly of two components. First is an agent that sits on the flume side, 
receives data from flume and makes it available via a socket server. In effect 
it converts a push to a pull model. The second component is the input operator 
that reads from the agent.
 
 The project is started with the latest code at the time of the sub-module 
creation. For older history look at the flume sub-module in the older project 
called Megh ([email protected]:DataTorrent/Megh).
+
+
+## Setup flume agent:
+
+To set up the flume agent for Apex input operator, flumes plugin-based
+architecture is used.
+
+Set up flume and make sure JAVA_HOME is set.
+
+Build malhar-flume `mvn clean package -DskipTests`.
+The plugin `malhar-flume-ver.jar` and all necessary dependencies `target/deps` 
can now be found in the target directory.
+To add the plugin to your flume service create a plugins.d directories in 
FLUME_HOME.
+
+Put the malhar-flume-ver.jar in `plugins.d/custom-plugin-name/lib/`
+and all the needed dependencies into `plugins.d/custom-plugin-name/libext/`
+
+(Alternatively to flume's automatic plugins.d detection, jars can be added to 
the
+FLUME_CLASSPATH using a `flume-env.sh` script. (See 
'resources/flume-conf/flume-env.sample.sh')
+Therefore a maven repository must be available under $HOME/.m2 and the 
environment variable
+DT_FLUME_JAR must point to the plugin JAR.)
+
+***Flume configuration***  
+A basic flume configuration can be found in 
`src/test/resources/flume/conf/flume_simple.conf`.  
+A flume configuration using discovery service can be found in 
`src/test/resources/flume/conf/flume_zkdiscovery.conf`.  
+  Configuration files should be placed in flumes 'conf' directory and will be 
explicitly selected
+  when running flume-ng
+
+In the configuration file set `org.apache.apex.malhar.flume.sink.DTFlumeSink` 
for the **type**  
+and `org.apache.apex.malhar.flume.storage.HDFSStorage` for the **storage**,  
+as well as a **HDFS directory** for `baseDir`. The HDFS base directory needs
+to be created on HDFS.
+
+For discovery set `org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery` 
for each sink
+and configure them to use the zookeeper service by adding the zookeeper 
address in `connectionString` as well as a `basePath`.
+These values also need to be set for **ZKListener** in the apex application.
+
+### Operator Usage
+
+An implementation of AbstractFlumeInputOperator can either simply connect
+to one flume sink or use discovery/zookeeper to detect flume sinks 
automatically
+and partition the operator accordingly at the beginning.
+
+Implement abstract method to convert the Flume event to tuple:
+```java
+public abstract T convert(Event event);
+```
+
+Additionally a StreamCodec for Flume events must be set. A codec implementation
+ can be found in storage/EventCodec.java
+```java
+setCodec(new EventCodec());
+```
+
+See `ApplicationDiscoveryTest.FlumeInputOperator` for an example operator 
implementation
+##### Simple connection setup to one flume sink:
+For a simple connection to only one flume sink set the connection address in 
the form of `sinkid:host:port`:
+```java
+public void setConnectAddresses(String[] specs)
+```
+
+
+##### Setup using discovery/zookeeper:
+For a flume input operator to discover flume sinks and partition accordingly
+a zookeeper service needs to be set up.
+
+An implementation of AbstractFlumeInputOperator needs to initialize a 
ZKStatsListener.
+It additionally needs to override **definePartitions** to setup 
ZKStatsListener, discover addresses using discover()
+and set them in discoveredFlumeSinks before calling the parents 
definePartitions method.
+
+
+See 
`src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java`
+and 
`src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java`
+for test implementations.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/pom.xml
----------------------------------------------------------------------
diff --git a/flume/pom.xml b/flume/pom.xml
index 735a13b..851697e 100644
--- a/flume/pom.xml
+++ b/flume/pom.xml
@@ -175,6 +175,24 @@
           <argLine>-Xmx5000M</argLine>
         </configuration>
       </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.8</version>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>prepare-package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>target/deps</outputDirectory>
+              <includeScope>runtime</includeScope>
+              <excludeGroupIds>org.apache.hadoop</excludeGroupIds>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
   <dependencies>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java
----------------------------------------------------------------------
diff --git 
a/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java
 
b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java
index bd7e5e0..11ec3ef 100644
--- 
a/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java
+++ 
b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java
@@ -212,7 +212,7 @@ public class ColumnFilteringFormattingInterceptor 
implements Interceptor
           dstSeparators[i] = emptyStringBytes;
         }
       }
-      srcSeparator = 
context.getInteger(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, (int) 
ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT).byteValue();
+      srcSeparator = 
context.getInteger(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, 
(int)ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT).byteValue();
       this.prefix = lPrefix.getBytes();
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
----------------------------------------------------------------------
diff --git 
a/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
 
b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
index da1a8aa..f9beb71 100644
--- 
a/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
+++ 
b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
@@ -33,11 +33,12 @@ import java.util.concurrent.ArrayBlockingQueue;
 
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.flume.discovery.Discovery;
+import org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery;
 import org.apache.apex.malhar.flume.sink.Server;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.flume.Event;
 
@@ -50,7 +51,6 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Stats.OperatorStats;
 import com.datatorrent.api.StreamCodec;
-import org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery;
 import com.datatorrent.netlet.AbstractLengthPrependerClient;
 import com.datatorrent.netlet.DefaultEventLoop;
 import com.datatorrent.netlet.util.Slice;
@@ -715,7 +715,7 @@ public abstract class AbstractFlumeInputOperator<T>
     }
 
   };
-  private static final transient 
ThreadLocal<Collection<Discovery.Service<byte[]>>> discoveredFlumeSinks =
+  protected static final transient 
ThreadLocal<Collection<Discovery.Service<byte[]>>> discoveredFlumeSinks =
       new ThreadLocal<Collection<Discovery.Service<byte[]>>>();
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java
----------------------------------------------------------------------
diff --git 
a/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java 
b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java
index 306ce13..4f28850 100644
--- a/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java
@@ -25,11 +25,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.ServiceConfigurationError;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.apex.malhar.flume.discovery.Discovery;
+import org.apache.apex.malhar.flume.sink.Server.Client;
+import org.apache.apex.malhar.flume.sink.Server.Request;
 import org.apache.apex.malhar.flume.storage.EventCodec;
 import org.apache.apex.malhar.flume.storage.Storage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -40,8 +43,7 @@ import org.apache.flume.sink.AbstractSink;
 
 import com.datatorrent.api.Component;
 import com.datatorrent.api.StreamCodec;
-import org.apache.apex.malhar.flume.sink.Server.Client;
-import org.apache.apex.malhar.flume.sink.Server.Request;
+
 import com.datatorrent.netlet.DefaultEventLoop;
 import com.datatorrent.netlet.NetletThrowable;
 import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java 
b/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java
index a771cb3..c8a8440 100644
--- a/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java
@@ -25,10 +25,11 @@ import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 
-import org.apache.apex.malhar.flume.discovery.Discovery;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.flume.discovery.Discovery;
+
 import com.datatorrent.netlet.AbstractLengthPrependerClient;
 import com.datatorrent.netlet.AbstractServer;
 import com.datatorrent.netlet.EventLoop;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java
----------------------------------------------------------------------
diff --git 
a/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java 
b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java
index 77aeb68..54716b7 100644
--- a/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java
@@ -32,6 +32,8 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.flume.sink.Server;
+
 import org.apache.flume.Context;
 import org.apache.flume.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -45,7 +47,6 @@ import com.google.common.primitives.Longs;
 
 import com.datatorrent.api.Component;
 import com.datatorrent.common.util.NameableThreadFactory;
-import org.apache.apex.malhar.flume.sink.Server;
 import com.datatorrent.netlet.util.Slice;
 
 /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
 
b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
index 6503357..9db5d32 100644
--- 
a/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
+++ 
b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
@@ -24,11 +24,11 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.flume.discovery.Discovery.Service;
+
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.curator.x.discovery.details.InstanceSerializer;
 
-import com.datatorrent.flume.discovery.Discovery.Service;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertNotNull;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java
 
b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java
new file mode 100644
index 0000000..5486469
--- /dev/null
+++ 
b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.integration;
+
+import java.util.Collection;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.flume.discovery.Discovery;
+import org.apache.apex.malhar.flume.operator.AbstractFlumeInputOperator;
+import org.apache.apex.malhar.flume.storage.EventCodec;
+import org.apache.flume.Event;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+
+/**
+ * baseDir needs to be created in HDFS
+ * Local zookeeper service needs to be running on default 127.0.0.1:2181
+ * Local flume service needs to be running using 
src/test/resources/flume/conf/flume_zkdiscovery.conf configuration
+ */
+@Ignore
+public class ApplicationDiscoveryTest implements StreamingApplication
+{
+  static int globalCount;
+
+  public static class FlumeInputOperator extends 
AbstractFlumeInputOperator<Event>
+  {
+    public ZKStatsListner zkListener = new 
AbstractFlumeInputOperator.ZKStatsListner();
+    private boolean first = true;
+
+
+    @Override
+    public Event convert(Event event)
+    {
+      return event;
+    }
+
+
+    @Override
+    public Collection<Partition<AbstractFlumeInputOperator<Event>>> 
definePartitions(Collection<Partition<AbstractFlumeInputOperator<Event>>> 
partitions, PartitioningContext context)
+    {
+      if (first) {
+        first = false;
+        zkListener.setup(null);
+      }
+      Collection<Discovery.Service<byte[]>> addresses;
+      addresses = zkListener.discover();
+      discoveredFlumeSinks.set(addresses);
+
+      return super.definePartitions(partitions, context);
+    }
+  }
+
+  public static class Counter implements Operator
+  {
+    private int count;
+    private transient Event event;
+    public final transient DefaultInputPort<Event> input = new 
DefaultInputPort<Event>()
+    {
+      @Override
+      public void process(Event tuple)
+      {
+        count++;
+        event = tuple;
+      }
+
+    };
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+    }
+
+    @Override
+    public void endWindow()
+    {
+      if (event != null) {
+        logger.info("total count = {}, tuple = {}", count, new 
String(event.getBody()));
+      } else {
+        logger.info("total count = {}, tuple = {}", count, event);
+      }
+      globalCount = count;
+    }
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+    }
+
+    @Override
+    public void teardown()
+    {
+    }
+
+    private static final Logger logger = 
LoggerFactory.getLogger(Counter.class);
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    
dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS,
 1000);
+    FlumeInputOperator flume = dag.addOperator("FlumeOperator", new 
FlumeInputOperator());
+    flume.setCodec(new EventCodec());
+    flume.zkListener.setConnectionString("127.0.0.1:2181");
+    flume.zkListener.setBasePath("/flume/basepath");
+    Counter counter = dag.addOperator("Counter", new Counter());
+
+    dag.addStream("Slices", flume.output, 
counter.input).setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+  @Test
+  public void test()
+  {
+    try {
+      LocalMode.runApp(this, 10000);
+    } catch (Exception ex) {
+      logger.warn("The dag seems to be not testable yet, if it's - remove this 
exception handling", ex);
+    }
+    //flume source sequence generator is set to 10 in flume configuration 
going to two source -> 20
+    Assert.assertEquals(20, globalCount);
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ApplicationDiscoveryTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java
 
b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java
index 10153bc..67c911c 100644
--- 
a/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java
+++ 
b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java
@@ -18,11 +18,15 @@
  */
 package org.apache.apex.malhar.flume.integration;
 
+import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.flume.operator.AbstractFlumeInputOperator;
+import org.apache.apex.malhar.flume.storage.EventCodec;
+
 import org.apache.flume.Event;
 import org.apache.hadoop.conf.Configuration;
 
@@ -33,15 +37,17 @@ import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.flume.operator.AbstractFlumeInputOperator;
-import com.datatorrent.flume.storage.EventCodec;
+
 
 /**
- *
+ * baseDir needs to be created in HDFS
+ * Local zookeeper service needs to be running on default 127.0.0.1:2181
+ * Local flume service needs to be running using 
src/test/resources/flume/conf/flume_simple.conf configuration
  */
 @Ignore
 public class ApplicationTest implements StreamingApplication
 {
+  static int globalCount;
   public static class FlumeInputOperator extends 
AbstractFlumeInputOperator<Event>
   {
     @Override
@@ -74,7 +80,12 @@ public class ApplicationTest implements StreamingApplication
     @Override
     public void endWindow()
     {
-      logger.debug("total count = {}, tuple = {}", count, event);
+      if (event != null) {
+        logger.info("total count = {}, tuple = {}", count, new 
String(event.getBody()));
+      } else {
+        logger.info("total count = {}, tuple = {}", count, event);
+      }
+      globalCount = count;
     }
 
     @Override
@@ -95,7 +106,7 @@ public class ApplicationTest implements StreamingApplication
   {
     
dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS,
 1000);
     FlumeInputOperator flume = dag.addOperator("FlumeOperator", new 
FlumeInputOperator());
-    flume.setConnectAddresses(new String[]{"test:127.0.0.1:8080"});
+    flume.setConnectAddresses(new String[]{"sink1:127.0.0.1:9098"});
     flume.setCodec(new EventCodec());
     Counter counter = dag.addOperator("Counter", new Counter());
 
@@ -106,11 +117,12 @@ public class ApplicationTest implements 
StreamingApplication
   public void test()
   {
     try {
-      LocalMode.runApp(this, Integer.MAX_VALUE);
+      LocalMode.runApp(this, 10000);
     } catch (Exception ex) {
       logger.warn("The dag seems to be not testable yet, if it's - remove this 
exception handling", ex);
     }
-
+    //flume source sequence generator is set to 10 in flume configuration
+    Assert.assertEquals(10, globalCount);
   }
 
   private static final Logger logger = 
LoggerFactory.getLogger(ApplicationTest.class);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java 
b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java
index 9bc69e8..f97d9c0 100644
--- a/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java
@@ -29,9 +29,10 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.flume.discovery.Discovery;
+
 import org.apache.flume.channel.MemoryChannel;
 
-import com.datatorrent.flume.discovery.Discovery;
 import com.datatorrent.netlet.AbstractLengthPrependerClient;
 import com.datatorrent.netlet.DefaultEventLoop;
 import com.datatorrent.netlet.util.Slice;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/resources/flume/conf/flume-conf.properties
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/flume/conf/flume-conf.properties 
b/flume/src/test/resources/flume/conf/flume-conf.properties
index b796e6d..73dc79a 100644
--- a/flume/src/test/resources/flume/conf/flume-conf.properties
+++ b/flume/src/test/resources/flume/conf/flume-conf.properties
@@ -33,12 +33,12 @@ agent1.sources.netcatSource.channels = ch1
 agent1.sources.netcatSource.command = src/test/bash/subcat_periodically 
src/test/resources/test_data/dt_spend 10000 1
 # Pick and Reorder the columns we need from a larger record for efficiency
   agent1.sources.netcatSource.interceptors = columnchooser
-  agent1.sources.netcatSource.interceptors.columnchooser.type = 
com.datatorrent.flume.interceptor.ColumnFilteringInterceptor$Builder
+  agent1.sources.netcatSource.interceptors.columnchooser.type = 
org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor$Builder
   agent1.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2
   agent1.sources.netcatSource.interceptors.columnchooser.dstSeparator = 1
   agent1.sources.netcatSource.interceptors.columnchooser.columns = 0 43 62 69 
68 139 190 70 71 52 75 37 39 42 191 138
 
- agent2.sources.netcatSource.interceptors.columnchooser.type = 
com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor$Builder
+ agent2.sources.netcatSource.interceptors.columnchooser.type = 
org.apache.apex.malhar.flume.interceptor.ColumnFilteringFormattingInterceptor$Builder
  agent2.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2
  agent2.sources.netcatSource.interceptors.columnchooser.columnsFormatter = 
{0}\u0001{43}\u0001{62}\u0001{69}\u0001{68}\u0001{139}\u0001{190}\u0001{70}\u0001{71}\u0001{52}\u0001{75}\u0001{37}\u0001{39}\u0001{42}\u0001{191}\u0001{138}\u0001
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/resources/flume/conf/flume_simple.conf
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/flume/conf/flume_simple.conf 
b/flume/src/test/resources/flume/conf/flume_simple.conf
new file mode 100644
index 0000000..b902881
--- /dev/null
+++ b/flume/src/test/resources/flume/conf/flume_simple.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# apex_example.conf: A single-node Flume configuration
+
+# Name the components on this agent
+a1.sources = r1
+a1.sinks = dt 
+a1.channels = c1
+
+# sequence generator source that generates numbers from 0 to 9
+a1.sources.r1.type = seq
+a1.sources.r1.totalEvents = 10
+
+# sink - dt
+ a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink
+ a1.sinks.dt.id = sink1
+ a1.sinks.dt.hostname = 127.0.0.1
+ a1.sinks.dt.port = 9098
+ a1.sinks.dt.sleepMillis = 7
+ a1.sinks.dt.throughputAdjustmentFactor = 2
+ a1.sinks.dt.maximumEventsPerTransaction = 5000
+ a1.sinks.dt.minimumEventsPerTransaction = 1
+ a1.sinks.dt.storage = org.apache.apex.malhar.flume.storage.HDFSStorage
+ a1.sinks.dt.storage.restore = false
+ a1.sinks.dt.storage.baseDir = /tmp/flume101
+ a1.sinks.dt.channel = c1
+
+# Use a channel which buffers events in memory
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 1000
+a1.channels.c1.transactionCapacity = 100
+
+# Bind the source and sink to the channel
+a1.sources.r1.channels = c1
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c48ec8c5/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf 
b/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf
new file mode 100644
index 0000000..6f8932c
--- /dev/null
+++ b/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# example.conf: A single-node Flume configuration
+
+# Name the components on this agent
+a1.sources = r1
+a1.sinks = dt dt2
+a1.channels = c1 c2
+
+# Alternative source for custom inputs
+#a1.sources.r1.type = netcat
+#a1.sources.r1.bind = 127.0.0.1
+#a1.sources.r1.port = 9097
+
+# sequence generator source that generates numbers from 0 to 9
+a1.sources.r1.type = seq
+a1.sources.r1.totalEvents = 10
+
+# first sink - dt
+ a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink
+ a1.sinks.dt.id = sink1
+ a1.sinks.dt.hostname = 127.0.0.1
+ a1.sinks.dt.port = 9098
+ a1.sinks.dt.sleepMillis = 7
+ a1.sinks.dt.throughputAdjustmentFactor = 2
+ a1.sinks.dt.maximumEventsPerTransaction = 5000
+ a1.sinks.dt.minimumEventsPerTransaction = 1
+ a1.sinks.dt.storage = org.apache.apex.malhar.flume.storage.HDFSStorage
+ a1.sinks.dt.storage.restore = false
+ a1.sinks.dt.storage.baseDir = /tmp/flume101
+ a1.sinks.dt.channel = c1
+
+# second sink - dt2
+ a1.sinks.dt2.type = org.apache.apex.malhar.flume.sink.DTFlumeSink
+ a1.sinks.dt2.id = sink2
+ a1.sinks.dt2.hostname = 127.0.0.1
+ a1.sinks.dt2.port = 9099
+ a1.sinks.dt2.sleepMillis = 7
+ a1.sinks.dt2.throughputAdjustmentFactor = 2
+ a1.sinks.dt2.maximumEventsPerTransaction = 5000
+ a1.sinks.dt2.minimumEventsPerTransaction = 1
+ a1.sinks.dt2.storage = org.apache.apex.malhar.flume.storage.HDFSStorage
+ a1.sinks.dt2.storage.restore = false
+ a1.sinks.dt2.storage.baseDir = /tmp/flume101
+ a1.sinks.dt2.channel = c2
+
+# Use a channel which buffers events in memory
+ a1.channels.c1.type = memory
+ a1.channels.c1.capacity = 1000
+ a1.channels.c1.transactionCapacity = 100
+
+# Ensure that we are able to detect flume sinks (and failures) automatically.
+ a1.sinks.dt.discovery = 
org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery
+ a1.sinks.dt.discovery.connectionString = 127.0.0.1:2181
+ a1.sinks.dt.discovery.basePath = /flume/basepath
+ a1.sinks.dt.discovery.connectionTimeoutMillis = 1000
+ a1.sinks.dt.discovery.connectionRetryCount = 10
+ a1.sinks.dt.discovery.connectionRetrySleepMillis = 500
+
+# Ensure that we are able to detect flume sinks (and failures) automatically.
+ a1.sinks.dt2.discovery = 
org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery
+ a1.sinks.dt2.discovery.connectionString = 127.0.0.1:2181
+ a1.sinks.dt2.discovery.basePath = /flume/basepath
+ a1.sinks.dt2.discovery.connectionTimeoutMillis = 1000
+ a1.sinks.dt2.discovery.connectionRetryCount = 10
+ a1.sinks.dt2.discovery.connectionRetrySleepMillis = 500
+
+# Use a channel which buffers events in memory
+ a1.channels.c2.type = memory
+ a1.channels.c2.capacity = 1000
+ a1.channels.c2.transactionCapacity = 100
+
+# Bind the source and sink to the channel
+ a1.sources.r1.channels = c1 c2

Reply via email to