Repository: apex-malhar
Updated Branches:
  refs/heads/master c84a2c867 -> 2f70751e7


Changed DT references to Apex


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2f70751e
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2f70751e
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2f70751e

Branch: refs/heads/master
Commit: 2f70751e7852cc8e2be94189e1bbf8be85a19559
Parents: c48ec8c
Author: Pramod Immaneni <[email protected]>
Authored: Mon May 22 14:27:02 2017 -0700
Committer: Pramod Immaneni <[email protected]>
Committed: Mon May 22 16:47:34 2017 -0700

----------------------------------------------------------------------
 flume/README.md                                 |   4 +-
 .../apex/malhar/flume/discovery/Discovery.java  |   6 +-
 .../flume/discovery/ZKAssistedDiscovery.java    |   4 +-
 .../operator/AbstractFlumeInputOperator.java    |   2 +-
 .../apex/malhar/flume/sink/DTFlumeSink.java     | 574 -------------------
 .../apex/malhar/flume/sink/FlumeSink.java       | 574 +++++++++++++++++++
 .../flume-conf/flume-conf.sample.properties     |   2 +-
 .../resources/flume-conf/flume-env.sample.sh    |   6 +-
 .../discovery/ZKAssistedDiscoveryTest.java      |  12 +-
 .../apex/malhar/flume/sink/DTFlumeSinkTest.java | 146 -----
 .../apex/malhar/flume/sink/FlumeSinkTest.java   | 146 +++++
 .../resources/flume/conf/flume-conf.properties  |   4 +-
 .../src/test/resources/flume/conf/flume-env.sh  |   6 +-
 .../test/resources/flume/conf/flume_simple.conf |   2 +-
 .../resources/flume/conf/flume_zkdiscovery.conf |   4 +-
 15 files changed, 746 insertions(+), 746 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/README.md
----------------------------------------------------------------------
diff --git a/flume/README.md b/flume/README.md
index ec8fae9..5f9c320 100644
--- a/flume/README.md
+++ b/flume/README.md
@@ -23,7 +23,7 @@ and all the needed dependencies into 
`plugins.d/custom-plugin-name/libext/`
 (Alternatively to flume's automatic plugins.d detection, jars can be added to 
the
 FLUME_CLASSPATH using a `flume-env.sh` script. (See 
'resources/flume-conf/flume-env.sample.sh')
 Therefore a maven repository must be available under $HOME/.m2 and the 
environment variable
-DT_FLUME_JAR must point to the plugin JAR.)
+APEX_FLUME_JAR must point to the plugin JAR.)
 
 ***Flume configuration***  
 A basic flume configuration can be found in 
`src/test/resources/flume/conf/flume_simple.conf`.  
@@ -31,7 +31,7 @@ A flume configuration using discovery service can be found in 
`src/test/resource
   Configuration files should be placed in flumes 'conf' directory and will be 
explicitly selected
   when running flume-ng
 
-In the configuration file set `org.apache.apex.malhar.flume.sink.DTFlumeSink` 
for the **type**  
+In the configuration file set `org.apache.apex.malhar.flume.sink.FlumeSink` 
for the **type**  
 and `org.apache.apex.malhar.flume.storage.HDFSStorage` for the **storage**,  
 as well as a **HDFS directory** for `baseDir`. The HDFS base directory needs
 to be created on HDFS.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java
----------------------------------------------------------------------
diff --git 
a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java 
b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java
index 619a625..c32c15b 100644
--- a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java
@@ -21,12 +21,12 @@ package org.apache.apex.malhar.flume.discovery;
 import java.util.Collection;
 
 /**
- * When DTFlumeSink server instance binds to the network interface, it can 
publish
+ * When FlumeSink server instance binds to the network interface, it can 
publish
  * its whereabouts by invoking advertise method on the Discovery object. 
Similarly
  * when it ceases accepting any more connections, it can publish its intent to 
do
  * so by invoking unadvertise.<p />
  * Interesting parties can call discover method to get the list of addresses 
where
- * they can find an available DTFlumeSink server instance.
+ * they can find an available FlumeSink server instance.
  *
  * @param <T> - Type of the objects which can be discovered
  * @since 0.9.3
@@ -41,7 +41,7 @@ public interface Discovery<T>
   void unadvertise(Service<T> service);
 
   /**
-   * Advertise the host/port address where DTFlumeSink is accepting a client 
connection.
+   * Advertise the host/port address where FlumeSink is accepting a client 
connection.
    *
    * @param service
    */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
----------------------------------------------------------------------
diff --git 
a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
 
b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
index 9a7dd3c..1988d62 100644
--- 
a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
+++ 
b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
@@ -70,7 +70,7 @@ public class ZKAssistedDiscovery implements Discovery<byte[]>,
 
   public ZKAssistedDiscovery()
   {
-    this.serviceName = "DTFlume";
+    this.serviceName = "ApexFlume";
     this.conntectionRetrySleepMillis = 500;
     this.connectionRetryCount = 10;
     this.connectionTimeoutMillis = 1000;
@@ -333,7 +333,7 @@ public class ZKAssistedDiscovery implements 
Discovery<byte[]>,
   @Override
   public void configure(org.apache.flume.Context context)
   {
-    serviceName = context.getString("serviceName", "DTFlume");
+    serviceName = context.getString("serviceName", "ApexFlume");
     connectionString = context.getString("connectionString");
     basePath = context.getString("basePath");
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
----------------------------------------------------------------------
diff --git 
a/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
 
b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
index f9beb71..93b01af 100644
--- 
a/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
+++ 
b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
@@ -119,7 +119,7 @@ public abstract class AbstractFlumeInputOperator<T>
   public void activate(OperatorContext ctx)
   {
     if (connectionSpecs.length == 0) {
-      logger.info("Discovered zero DTFlumeSink");
+      logger.info("Discovered zero FlumeSink");
     } else if (connectionSpecs.length == 1) {
       for (String connectAddresse: connectionSpecs) {
         logger.debug("Connection spec is {}", connectAddresse);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java
----------------------------------------------------------------------
diff --git 
a/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java 
b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java
deleted file mode 100644
index 4f28850..0000000
--- a/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java
+++ /dev/null
@@ -1,574 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.flume.sink;
-
-import java.io.IOError;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.ServiceConfigurationError;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.apex.malhar.flume.discovery.Discovery;
-import org.apache.apex.malhar.flume.sink.Server.Client;
-import org.apache.apex.malhar.flume.sink.Server.Request;
-import org.apache.apex.malhar.flume.storage.EventCodec;
-import org.apache.apex.malhar.flume.storage.Storage;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.AbstractSink;
-
-import com.datatorrent.api.Component;
-import com.datatorrent.api.StreamCodec;
-
-import com.datatorrent.netlet.DefaultEventLoop;
-import com.datatorrent.netlet.NetletThrowable;
-import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * DTFlumeSink is a flume sink developed to ingest the data into DataTorrent 
DAG
- * from flume. It's essentially a flume sink which acts as a server capable of
- * talking to one client at a time. The client for this server is 
AbstractFlumeInputOperator.
- * <p />
- * &lt;experimental&gt;DTFlumeSink auto adjusts the rate at which it consumes 
the data from channel to
- * match the throughput of the DAG.&lt;/experimental&gt;
- * <p />
- * The properties you can set on the DTFlumeSink are: <br />
- * id - string unique value identifying this sink <br />
- * hostname - string value indicating the fqdn or ip address of the interface 
on which the server should listen <br />
- * port - integer value indicating the numeric port to which the server should 
bind <br />
- * sleepMillis - integer value indicating the number of milliseconds the 
process should sleep when there are no events
- * before checking for next event again <br />
- * throughputAdjustmentPercent - integer value indicating by what percentage 
the flume transaction size should be
- * adjusted upward or downward at a time <br />
- * minimumEventsPerTransaction - integer value indicating the minimum number 
of events per transaction <br />
- * maximumEventsPerTransaction - integer value indicating the maximum number 
of events per transaction. This value can
- * not be more than channel's transaction capacity.<br />
- *
- * @since 0.9.2
- */
-public class DTFlumeSink extends AbstractSink implements Configurable
-{
-  private static final String HOSTNAME_STRING = "hostname";
-  private static final String HOSTNAME_DEFAULT = "locahost";
-  private static final long ACCEPTED_TOLERANCE = 20000;
-  private DefaultEventLoop eventloop;
-  private Server server;
-  private int outstandingEventsCount;
-  private int lastConsumedEventsCount;
-  private int idleCount;
-  private byte[] playback;
-  private Client client;
-  private String hostname;
-  private int port;
-  private String id;
-  private long acceptedTolerance;
-  private long sleepMillis;
-  private double throughputAdjustmentFactor;
-  private int minimumEventsPerTransaction;
-  private int maximumEventsPerTransaction;
-  private long commitEventTimeoutMillis;
-  private transient long lastCommitEventTimeMillis;
-  private Storage storage;
-  Discovery<byte[]> discovery;
-  StreamCodec<Event> codec;
-  /* Begin implementing Flume Sink interface */
-
-  @Override
-  @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch", "UseSpecificCatch", 
"SleepWhileInLoop"})
-  public Status process() throws EventDeliveryException
-  {
-    Slice slice;
-    synchronized (server.requests) {
-      for (Request r : server.requests) {
-        logger.debug("found {}", r);
-        switch (r.type) {
-          case SEEK:
-            lastCommitEventTimeMillis = System.currentTimeMillis();
-            slice = r.getAddress();
-            playback = storage.retrieve(Arrays.copyOfRange(slice.buffer, 
slice.offset, slice.offset + slice.length));
-            client = r.client;
-            break;
-
-          case COMMITTED:
-            lastCommitEventTimeMillis = System.currentTimeMillis();
-            slice = r.getAddress();
-            storage.clean(Arrays.copyOfRange(slice.buffer, slice.offset, 
slice.offset + slice.length));
-            break;
-
-          case CONNECTED:
-            logger.debug("Connected received, ignoring it!");
-            break;
-
-          case DISCONNECTED:
-            if (r.client == client) {
-              client = null;
-              outstandingEventsCount = 0;
-            }
-            break;
-
-          case WINDOWED:
-            lastConsumedEventsCount = r.getEventCount();
-            idleCount = r.getIdleCount();
-            outstandingEventsCount -= lastConsumedEventsCount;
-            break;
-
-          case SERVER_ERROR:
-            throw new IOError(null);
-
-          default:
-            logger.debug("Cannot understand the request {}", r);
-            break;
-        }
-      }
-
-      server.requests.clear();
-    }
-
-    if (client == null) {
-      logger.info("No client expressed interest yet to consume the events.");
-      return Status.BACKOFF;
-    } else if (System.currentTimeMillis() - lastCommitEventTimeMillis > 
commitEventTimeoutMillis) {
-      logger.info("Client has not processed the workload given for the last {} 
milliseconds, so backing off.",
-          System.currentTimeMillis() - lastCommitEventTimeMillis);
-      return Status.BACKOFF;
-    }
-
-    int maxTuples;
-    // the following logic needs to be fixed... this is a quick put together.
-    if (outstandingEventsCount < 0) {
-      if (idleCount > 1) {
-        maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * 
lastConsumedEventsCount);
-      } else {
-        maxTuples = (int)((1 + throughputAdjustmentFactor) * 
lastConsumedEventsCount);
-      }
-    } else if (outstandingEventsCount > lastConsumedEventsCount) {
-      maxTuples = (int)((1 - throughputAdjustmentFactor) * 
lastConsumedEventsCount);
-    } else {
-      if (idleCount > 0) {
-        maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * 
lastConsumedEventsCount);
-        if (maxTuples <= 0) {
-          maxTuples = minimumEventsPerTransaction;
-        }
-      } else {
-        maxTuples = lastConsumedEventsCount;
-      }
-    }
-
-    if (maxTuples >= maximumEventsPerTransaction) {
-      maxTuples = maximumEventsPerTransaction;
-    } else if (maxTuples <= 0) {
-      maxTuples = minimumEventsPerTransaction;
-    }
-
-    if (maxTuples > 0) {
-      if (playback != null) {
-        try {
-          int i = 0;
-          do {
-            if (!client.write(playback)) {
-              retryWrite(playback, null);
-            }
-            outstandingEventsCount++;
-            playback = storage.retrieveNext();
-          }
-          while (++i < maxTuples && playback != null);
-        } catch (Exception ex) {
-          logger.warn("Playback Failed", ex);
-          if (ex instanceof NetletThrowable) {
-            try {
-              eventloop.disconnect(client);
-            } finally {
-              client = null;
-              outstandingEventsCount = 0;
-            }
-          }
-          return Status.BACKOFF;
-        }
-      } else {
-        int storedTuples = 0;
-
-        Transaction t = getChannel().getTransaction();
-        try {
-          t.begin();
-
-          Event e;
-          while (storedTuples < maxTuples && (e = getChannel().take()) != 
null) {
-            Slice event = codec.toByteArray(e);
-            byte[] address = storage.store(event);
-            if (address != null) {
-              if (!client.write(address, event)) {
-                retryWrite(address, event);
-              }
-              outstandingEventsCount++;
-            } else {
-              logger.debug("Detected the condition of recovery from flume 
crash!");
-            }
-            storedTuples++;
-          }
-
-          if (storedTuples > 0) {
-            storage.flush();
-          }
-
-          t.commit();
-
-          if (storedTuples > 0) { /* log less frequently */
-            logger.debug("Transaction details maxTuples = {}, storedTuples = 
{}, outstanding = {}",
-                maxTuples, storedTuples, outstandingEventsCount);
-          }
-        } catch (Error er) {
-          t.rollback();
-          throw er;
-        } catch (Exception ex) {
-          logger.error("Transaction Failed", ex);
-          if (ex instanceof NetletRuntimeException && client != null) {
-            try {
-              eventloop.disconnect(client);
-            } finally {
-              client = null;
-              outstandingEventsCount = 0;
-            }
-          }
-          t.rollback();
-          return Status.BACKOFF;
-        } finally {
-          t.close();
-        }
-
-        if (storedTuples == 0) {
-          sleep();
-        }
-      }
-    }
-
-    return Status.READY;
-  }
-
-  private void sleep()
-  {
-    try {
-      Thread.sleep(sleepMillis);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  @Override
-  public void start()
-  {
-    try {
-      if (storage instanceof Component) {
-        @SuppressWarnings("unchecked")
-        Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)storage;
-        component.setup(null);
-      }
-      if (discovery instanceof Component) {
-        @SuppressWarnings("unchecked")
-        Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)discovery;
-        component.setup(null);
-      }
-      if (codec instanceof Component) {
-        @SuppressWarnings("unchecked")
-        Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)codec;
-        component.setup(null);
-      }
-      eventloop = new DefaultEventLoop("EventLoop-" + id);
-      server = new Server(id, discovery,acceptedTolerance);
-    } catch (Error error) {
-      throw error;
-    } catch (RuntimeException re) {
-      throw re;
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    }
-
-    eventloop.start();
-    eventloop.start(hostname, port, server);
-    super.start();
-  }
-
-  @Override
-  public void stop()
-  {
-    try {
-      super.stop();
-    } finally {
-      try {
-        if (client != null) {
-          eventloop.disconnect(client);
-          client = null;
-        }
-
-        eventloop.stop(server);
-        eventloop.stop();
-
-        if (codec instanceof Component) {
-          @SuppressWarnings("unchecked")
-          Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)codec;
-          component.teardown();
-        }
-        if (discovery instanceof Component) {
-          @SuppressWarnings("unchecked")
-          Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)discovery;
-          component.teardown();
-        }
-        if (storage instanceof Component) {
-          @SuppressWarnings("unchecked")
-          Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)storage;
-          component.teardown();
-        }
-      } catch (Throwable cause) {
-        throw new ServiceConfigurationError("Failed Stop", cause);
-      }
-    }
-  }
-
-  /* End implementing Flume Sink interface */
-
-  /* Begin Configurable Interface */
-  @Override
-  public void configure(Context context)
-  {
-    hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT);
-    port = context.getInteger("port", 0);
-    id = context.getString("id");
-    if (id == null) {
-      id = getName();
-    }
-    acceptedTolerance = context.getLong("acceptedTolerance", 
ACCEPTED_TOLERANCE);
-    sleepMillis = context.getLong("sleepMillis", 5L);
-    throughputAdjustmentFactor = 
context.getInteger("throughputAdjustmentPercent", 5) / 100.0;
-    maximumEventsPerTransaction = 
context.getInteger("maximumEventsPerTransaction", 10000);
-    minimumEventsPerTransaction = 
context.getInteger("minimumEventsPerTransaction", 100);
-    commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", 
Long.MAX_VALUE);
-
-    @SuppressWarnings("unchecked")
-    Discovery<byte[]> ldiscovery = configure("discovery", Discovery.class, 
context);
-    if (ldiscovery == null) {
-      logger.warn("Discovery agent not configured for the sink!");
-      discovery = new Discovery<byte[]>()
-      {
-        @Override
-        public void unadvertise(Service<byte[]> service)
-        {
-          logger.debug("Sink {} stopped listening on {}:{}", service.getId(), 
service.getHost(), service.getPort());
-        }
-
-        @Override
-        public void advertise(Service<byte[]> service)
-        {
-          logger.debug("Sink {} started listening on {}:{}", service.getId(), 
service.getHost(), service.getPort());
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public Collection<Service<byte[]>> discover()
-        {
-          return Collections.EMPTY_SET;
-        }
-
-      };
-    } else {
-      discovery = ldiscovery;
-    }
-
-    storage = configure("storage", Storage.class, context);
-    if (storage == null) {
-      logger.warn("storage key missing... DTFlumeSink may lose data!");
-      storage = new Storage()
-      {
-        @Override
-        public byte[] store(Slice slice)
-        {
-          return null;
-        }
-
-        @Override
-        public byte[] retrieve(byte[] identifier)
-        {
-          return null;
-        }
-
-        @Override
-        public byte[] retrieveNext()
-        {
-          return null;
-        }
-
-        @Override
-        public void clean(byte[] identifier)
-        {
-        }
-
-        @Override
-        public void flush()
-        {
-        }
-
-      };
-    }
-
-    @SuppressWarnings("unchecked")
-    StreamCodec<Event> lCodec = configure("codec", StreamCodec.class, context);
-    if (lCodec == null) {
-      codec = new EventCodec();
-    } else {
-      codec = lCodec;
-    }
-
-  }
-
-  /* End Configurable Interface */
-
-  @SuppressWarnings({"UseSpecificCatch", "BroadCatchBlock", "TooBroadCatch"})
-  private static <T> T configure(String key, Class<T> clazz, Context context)
-  {
-    String classname = context.getString(key);
-    if (classname == null) {
-      return null;
-    }
-
-    try {
-      Class<?> loadClass = 
Thread.currentThread().getContextClassLoader().loadClass(classname);
-      if (clazz.isAssignableFrom(loadClass)) {
-        @SuppressWarnings("unchecked")
-        T object = (T)loadClass.newInstance();
-        if (object instanceof Configurable) {
-          Context context1 = new Context(context.getSubProperties(key + '.'));
-          String id = context1.getString(Storage.ID);
-          if (id == null) {
-            id = context.getString(Storage.ID);
-            logger.debug("{} inherited id={} from sink", key, id);
-            context1.put(Storage.ID, id);
-          }
-          ((Configurable)object).configure(context1);
-        }
-
-        return object;
-      } else {
-        logger.error("key class {} does not implement {} interface", 
classname, Storage.class.getCanonicalName());
-        throw new Error("Invalid storage " + classname);
-      }
-    } catch (Error error) {
-      throw error;
-    } catch (RuntimeException re) {
-      throw re;
-    } catch (Throwable t) {
-      throw new RuntimeException(t);
-    }
-  }
-
-  /**
-   * @return the hostname
-   */
-  String getHostname()
-  {
-    return hostname;
-  }
-
-  /**
-   * @param hostname the hostname to set
-   */
-  void setHostname(String hostname)
-  {
-    this.hostname = hostname;
-  }
-
-  /**
-   * @return the port
-   */
-  int getPort()
-  {
-    return port;
-  }
-
-  public long getAcceptedTolerance()
-  {
-    return acceptedTolerance;
-  }
-
-  public void setAcceptedTolerance(long acceptedTolerance)
-  {
-    this.acceptedTolerance = acceptedTolerance;
-  }
-
-  /**
-   * @param port the port to set
-   */
-  void setPort(int port)
-  {
-    this.port = port;
-  }
-
-  /**
-   * @return the discovery
-   */
-  Discovery<byte[]> getDiscovery()
-  {
-    return discovery;
-  }
-
-  /**
-   * @param discovery the discovery to set
-   */
-  void setDiscovery(Discovery<byte[]> discovery)
-  {
-    this.discovery = discovery;
-  }
-
-  /**
-   * Attempt the sequence of writing after sleeping twice and upon failure 
assume
-   * that the client connection has problems and hence close it.
-   *
-   * @param address
-   * @param e
-   * @throws IOException
-   */
-  private void retryWrite(byte[] address, Slice event) throws IOException
-  {
-    if (event == null) {  /* this happens for playback where address and event 
are sent as single object */
-      while (client.isConnected()) {
-        sleep();
-        if (client.write(address)) {
-          return;
-        }
-      }
-    } else {  /* this happens when the events are taken from the flume channel 
and writing first time failed */
-      while (client.isConnected()) {
-        sleep();
-        if (client.write(address, event)) {
-          return;
-        }
-      }
-    }
-
-    throw new IOException("Client disconnected!");
-  }
-
-  private static final Logger logger = 
LoggerFactory.getLogger(DTFlumeSink.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/java/org/apache/apex/malhar/flume/sink/FlumeSink.java
----------------------------------------------------------------------
diff --git 
a/flume/src/main/java/org/apache/apex/malhar/flume/sink/FlumeSink.java 
b/flume/src/main/java/org/apache/apex/malhar/flume/sink/FlumeSink.java
new file mode 100644
index 0000000..99cc1d5
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/sink/FlumeSink.java
@@ -0,0 +1,574 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.sink;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ServiceConfigurationError;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.flume.discovery.Discovery;
+import org.apache.apex.malhar.flume.sink.Server.Client;
+import org.apache.apex.malhar.flume.sink.Server.Request;
+import org.apache.apex.malhar.flume.storage.EventCodec;
+import org.apache.apex.malhar.flume.storage.Storage;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.StreamCodec;
+
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.NetletThrowable;
+import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * FlumeSink is a flume sink developed to ingest the data into DataTorrent DAG
+ * from flume. It's essentially a flume sink which acts as a server capable of
+ * talking to one client at a time. The client for this server is 
AbstractFlumeInputOperator.
+ * <p />
+ * &lt;experimental&gt;FlumeSink auto adjusts the rate at which it consumes 
the data from channel to
+ * match the throughput of the DAG.&lt;/experimental&gt;
+ * <p />
+ * The properties you can set on the FlumeSink are: <br />
+ * id - string unique value identifying this sink <br />
+ * hostname - string value indicating the fqdn or ip address of the interface 
on which the server should listen <br />
+ * port - integer value indicating the numeric port to which the server should 
bind <br />
+ * sleepMillis - integer value indicating the number of milliseconds the 
process should sleep when there are no events
+ * before checking for next event again <br />
+ * throughputAdjustmentPercent - integer value indicating by what percentage 
the flume transaction size should be
+ * adjusted upward or downward at a time <br />
+ * minimumEventsPerTransaction - integer value indicating the minimum number 
of events per transaction <br />
+ * maximumEventsPerTransaction - integer value indicating the maximum number 
of events per transaction. This value can
+ * not be more than channel's transaction capacity.<br />
+ *
+ * @since 0.9.2
+ */
+public class FlumeSink extends AbstractSink implements Configurable
+{
+  private static final String HOSTNAME_STRING = "hostname";
+  private static final String HOSTNAME_DEFAULT = "locahost";
+  private static final long ACCEPTED_TOLERANCE = 20000;
+  private DefaultEventLoop eventloop;
+  private Server server;
+  private int outstandingEventsCount;
+  private int lastConsumedEventsCount;
+  private int idleCount;
+  private byte[] playback;
+  private Client client;
+  private String hostname;
+  private int port;
+  private String id;
+  private long acceptedTolerance;
+  private long sleepMillis;
+  private double throughputAdjustmentFactor;
+  private int minimumEventsPerTransaction;
+  private int maximumEventsPerTransaction;
+  private long commitEventTimeoutMillis;
+  private transient long lastCommitEventTimeMillis;
+  private Storage storage;
+  Discovery<byte[]> discovery;
+  StreamCodec<Event> codec;
+  /* Begin implementing Flume Sink interface */
+
+  @Override
+  @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch", "UseSpecificCatch", 
"SleepWhileInLoop"})
+  public Status process() throws EventDeliveryException
+  {
+    Slice slice;
+    synchronized (server.requests) {
+      for (Request r : server.requests) {
+        logger.debug("found {}", r);
+        switch (r.type) {
+          case SEEK:
+            lastCommitEventTimeMillis = System.currentTimeMillis();
+            slice = r.getAddress();
+            playback = storage.retrieve(Arrays.copyOfRange(slice.buffer, 
slice.offset, slice.offset + slice.length));
+            client = r.client;
+            break;
+
+          case COMMITTED:
+            lastCommitEventTimeMillis = System.currentTimeMillis();
+            slice = r.getAddress();
+            storage.clean(Arrays.copyOfRange(slice.buffer, slice.offset, 
slice.offset + slice.length));
+            break;
+
+          case CONNECTED:
+            logger.debug("Connected received, ignoring it!");
+            break;
+
+          case DISCONNECTED:
+            if (r.client == client) {
+              client = null;
+              outstandingEventsCount = 0;
+            }
+            break;
+
+          case WINDOWED:
+            lastConsumedEventsCount = r.getEventCount();
+            idleCount = r.getIdleCount();
+            outstandingEventsCount -= lastConsumedEventsCount;
+            break;
+
+          case SERVER_ERROR:
+            throw new IOError(null);
+
+          default:
+            logger.debug("Cannot understand the request {}", r);
+            break;
+        }
+      }
+
+      server.requests.clear();
+    }
+
+    if (client == null) {
+      logger.info("No client expressed interest yet to consume the events.");
+      return Status.BACKOFF;
+    } else if (System.currentTimeMillis() - lastCommitEventTimeMillis > 
commitEventTimeoutMillis) {
+      logger.info("Client has not processed the workload given for the last {} 
milliseconds, so backing off.",
+          System.currentTimeMillis() - lastCommitEventTimeMillis);
+      return Status.BACKOFF;
+    }
+
+    int maxTuples;
+    // the following logic needs to be fixed... this is a quick put together.
+    if (outstandingEventsCount < 0) {
+      if (idleCount > 1) {
+        maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * 
lastConsumedEventsCount);
+      } else {
+        maxTuples = (int)((1 + throughputAdjustmentFactor) * 
lastConsumedEventsCount);
+      }
+    } else if (outstandingEventsCount > lastConsumedEventsCount) {
+      maxTuples = (int)((1 - throughputAdjustmentFactor) * 
lastConsumedEventsCount);
+    } else {
+      if (idleCount > 0) {
+        maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * 
lastConsumedEventsCount);
+        if (maxTuples <= 0) {
+          maxTuples = minimumEventsPerTransaction;
+        }
+      } else {
+        maxTuples = lastConsumedEventsCount;
+      }
+    }
+
+    if (maxTuples >= maximumEventsPerTransaction) {
+      maxTuples = maximumEventsPerTransaction;
+    } else if (maxTuples <= 0) {
+      maxTuples = minimumEventsPerTransaction;
+    }
+
+    if (maxTuples > 0) {
+      if (playback != null) {
+        try {
+          int i = 0;
+          do {
+            if (!client.write(playback)) {
+              retryWrite(playback, null);
+            }
+            outstandingEventsCount++;
+            playback = storage.retrieveNext();
+          }
+          while (++i < maxTuples && playback != null);
+        } catch (Exception ex) {
+          logger.warn("Playback Failed", ex);
+          if (ex instanceof NetletThrowable) {
+            try {
+              eventloop.disconnect(client);
+            } finally {
+              client = null;
+              outstandingEventsCount = 0;
+            }
+          }
+          return Status.BACKOFF;
+        }
+      } else {
+        int storedTuples = 0;
+
+        Transaction t = getChannel().getTransaction();
+        try {
+          t.begin();
+
+          Event e;
+          while (storedTuples < maxTuples && (e = getChannel().take()) != 
null) {
+            Slice event = codec.toByteArray(e);
+            byte[] address = storage.store(event);
+            if (address != null) {
+              if (!client.write(address, event)) {
+                retryWrite(address, event);
+              }
+              outstandingEventsCount++;
+            } else {
+              logger.debug("Detected the condition of recovery from flume 
crash!");
+            }
+            storedTuples++;
+          }
+
+          if (storedTuples > 0) {
+            storage.flush();
+          }
+
+          t.commit();
+
+          if (storedTuples > 0) { /* log less frequently */
+            logger.debug("Transaction details maxTuples = {}, storedTuples = 
{}, outstanding = {}",
+                maxTuples, storedTuples, outstandingEventsCount);
+          }
+        } catch (Error er) {
+          t.rollback();
+          throw er;
+        } catch (Exception ex) {
+          logger.error("Transaction Failed", ex);
+          if (ex instanceof NetletRuntimeException && client != null) {
+            try {
+              eventloop.disconnect(client);
+            } finally {
+              client = null;
+              outstandingEventsCount = 0;
+            }
+          }
+          t.rollback();
+          return Status.BACKOFF;
+        } finally {
+          t.close();
+        }
+
+        if (storedTuples == 0) {
+          sleep();
+        }
+      }
+    }
+
+    return Status.READY;
+  }
+
+  private void sleep()
+  {
+    try {
+      Thread.sleep(sleepMillis);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void start()
+  {
+    try {
+      if (storage instanceof Component) {
+        @SuppressWarnings("unchecked")
+        Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)storage;
+        component.setup(null);
+      }
+      if (discovery instanceof Component) {
+        @SuppressWarnings("unchecked")
+        Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)discovery;
+        component.setup(null);
+      }
+      if (codec instanceof Component) {
+        @SuppressWarnings("unchecked")
+        Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)codec;
+        component.setup(null);
+      }
+      eventloop = new DefaultEventLoop("EventLoop-" + id);
+      server = new Server(id, discovery,acceptedTolerance);
+    } catch (Error error) {
+      throw error;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    eventloop.start();
+    eventloop.start(hostname, port, server);
+    super.start();
+  }
+
+  @Override
+  public void stop()
+  {
+    try {
+      super.stop();
+    } finally {
+      try {
+        if (client != null) {
+          eventloop.disconnect(client);
+          client = null;
+        }
+
+        eventloop.stop(server);
+        eventloop.stop();
+
+        if (codec instanceof Component) {
+          @SuppressWarnings("unchecked")
+          Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)codec;
+          component.teardown();
+        }
+        if (discovery instanceof Component) {
+          @SuppressWarnings("unchecked")
+          Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)discovery;
+          component.teardown();
+        }
+        if (storage instanceof Component) {
+          @SuppressWarnings("unchecked")
+          Component<com.datatorrent.api.Context> component = 
(Component<com.datatorrent.api.Context>)storage;
+          component.teardown();
+        }
+      } catch (Throwable cause) {
+        throw new ServiceConfigurationError("Failed Stop", cause);
+      }
+    }
+  }
+
+  /* End implementing Flume Sink interface */
+
+  /* Begin Configurable Interface */
+  @Override
+  public void configure(Context context)
+  {
+    hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT);
+    port = context.getInteger("port", 0);
+    id = context.getString("id");
+    if (id == null) {
+      id = getName();
+    }
+    acceptedTolerance = context.getLong("acceptedTolerance", 
ACCEPTED_TOLERANCE);
+    sleepMillis = context.getLong("sleepMillis", 5L);
+    throughputAdjustmentFactor = 
context.getInteger("throughputAdjustmentPercent", 5) / 100.0;
+    maximumEventsPerTransaction = 
context.getInteger("maximumEventsPerTransaction", 10000);
+    minimumEventsPerTransaction = 
context.getInteger("minimumEventsPerTransaction", 100);
+    commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", 
Long.MAX_VALUE);
+
+    @SuppressWarnings("unchecked")
+    Discovery<byte[]> ldiscovery = configure("discovery", Discovery.class, 
context);
+    if (ldiscovery == null) {
+      logger.warn("Discovery agent not configured for the sink!");
+      discovery = new Discovery<byte[]>()
+      {
+        @Override
+        public void unadvertise(Service<byte[]> service)
+        {
+          logger.debug("Sink {} stopped listening on {}:{}", service.getId(), 
service.getHost(), service.getPort());
+        }
+
+        @Override
+        public void advertise(Service<byte[]> service)
+        {
+          logger.debug("Sink {} started listening on {}:{}", service.getId(), 
service.getHost(), service.getPort());
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public Collection<Service<byte[]>> discover()
+        {
+          return Collections.EMPTY_SET;
+        }
+
+      };
+    } else {
+      discovery = ldiscovery;
+    }
+
+    storage = configure("storage", Storage.class, context);
+    if (storage == null) {
+      logger.warn("storage key missing... FlumeSink may lose data!");
+      storage = new Storage()
+      {
+        @Override
+        public byte[] store(Slice slice)
+        {
+          return null;
+        }
+
+        @Override
+        public byte[] retrieve(byte[] identifier)
+        {
+          return null;
+        }
+
+        @Override
+        public byte[] retrieveNext()
+        {
+          return null;
+        }
+
+        @Override
+        public void clean(byte[] identifier)
+        {
+        }
+
+        @Override
+        public void flush()
+        {
+        }
+
+      };
+    }
+
+    @SuppressWarnings("unchecked")
+    StreamCodec<Event> lCodec = configure("codec", StreamCodec.class, context);
+    if (lCodec == null) {
+      codec = new EventCodec();
+    } else {
+      codec = lCodec;
+    }
+
+  }
+
+  /* End Configurable Interface */
+
+  @SuppressWarnings({"UseSpecificCatch", "BroadCatchBlock", "TooBroadCatch"})
+  private static <T> T configure(String key, Class<T> clazz, Context context)
+  {
+    String classname = context.getString(key);
+    if (classname == null) {
+      return null;
+    }
+
+    try {
+      Class<?> loadClass = 
Thread.currentThread().getContextClassLoader().loadClass(classname);
+      if (clazz.isAssignableFrom(loadClass)) {
+        @SuppressWarnings("unchecked")
+        T object = (T)loadClass.newInstance();
+        if (object instanceof Configurable) {
+          Context context1 = new Context(context.getSubProperties(key + '.'));
+          String id = context1.getString(Storage.ID);
+          if (id == null) {
+            id = context.getString(Storage.ID);
+            logger.debug("{} inherited id={} from sink", key, id);
+            context1.put(Storage.ID, id);
+          }
+          ((Configurable)object).configure(context1);
+        }
+
+        return object;
+      } else {
+        logger.error("key class {} does not implement {} interface", 
classname, Storage.class.getCanonicalName());
+        throw new Error("Invalid storage " + classname);
+      }
+    } catch (Error error) {
+      throw error;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+  }
+
+  /**
+   * @return the hostname
+   */
+  String getHostname()
+  {
+    return hostname;
+  }
+
+  /**
+   * @param hostname the hostname to set
+   */
+  void setHostname(String hostname)
+  {
+    this.hostname = hostname;
+  }
+
+  /**
+   * @return the port
+   */
+  int getPort()
+  {
+    return port;
+  }
+
+  public long getAcceptedTolerance()
+  {
+    return acceptedTolerance;
+  }
+
+  public void setAcceptedTolerance(long acceptedTolerance)
+  {
+    this.acceptedTolerance = acceptedTolerance;
+  }
+
+  /**
+   * @param port the port to set
+   */
+  void setPort(int port)
+  {
+    this.port = port;
+  }
+
+  /**
+   * @return the discovery
+   */
+  Discovery<byte[]> getDiscovery()
+  {
+    return discovery;
+  }
+
+  /**
+   * @param discovery the discovery to set
+   */
+  void setDiscovery(Discovery<byte[]> discovery)
+  {
+    this.discovery = discovery;
+  }
+
+  /**
+   * Attempt the sequence of writing after sleeping twice and upon failure 
assume
+   * that the client connection has problems and hence close it.
+   *
+   * @param address
+   * @param e
+   * @throws IOException
+   */
+  private void retryWrite(byte[] address, Slice event) throws IOException
+  {
+    if (event == null) {  /* this happens for playback where address and event 
are sent as single object */
+      while (client.isConnected()) {
+        sleep();
+        if (client.write(address)) {
+          return;
+        }
+      }
+    } else {  /* this happens when the events are taken from the flume channel 
and writing first time failed */
+      while (client.isConnected()) {
+        sleep();
+        if (client.write(address, event)) {
+          return;
+        }
+      }
+    }
+
+    throw new IOException("Client disconnected!");
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FlumeSink.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/resources/flume-conf/flume-conf.sample.properties
----------------------------------------------------------------------
diff --git a/flume/src/main/resources/flume-conf/flume-conf.sample.properties 
b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
index af59e52..9504441 100644
--- a/flume/src/main/resources/flume-conf/flume-conf.sample.properties
+++ b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
@@ -23,7 +23,7 @@
  agent1.sinks = dt
 
 # first sink - dt
- agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink
+ agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.FlumeSink
  agent1.sinks.dt.id = sink1
  agent1.sinks.dt.hostname = localhost
  agent1.sinks.dt.port = 8080

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/resources/flume-conf/flume-env.sample.sh
----------------------------------------------------------------------
diff --git a/flume/src/main/resources/flume-conf/flume-env.sample.sh 
b/flume/src/main/resources/flume-conf/flume-env.sample.sh
index 570411b..41d093f 100644
--- a/flume/src/main/resources/flume-conf/flume-env.sample.sh
+++ b/flume/src/main/resources/flume-conf/flume-env.sample.sh
@@ -22,9 +22,9 @@
 # This script runs on the machine which have maven repository populated under
 # $HOME/.m2 If that's not the case, please adjust the JARPATH variable below
 # to point to colon separated list of directories where jar files can be found
-if test -z "$DT_FLUME_JAR"
+if test -z "$APEX_FLUME_JAR"
 then
-  echo [ERROR]: Environment variable DT_FLUME_JAR should point to a valid jar 
file which contains DTFlumeSink class >&2
+  echo [ERROR]: Environment variable APEX_FLUME_JAR should point to a valid 
jar file which contains FlumeSink class >&2
   exit 2
 fi
 
@@ -35,4 +35,4 @@ then
 else
   JAVA=${JAVA_HOME}/bin/java
 fi
-FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $DT_FLUME_JAR 
com.datatorrent.jarpath.JarPath -N $DT_FLUME_JAR -Xdt-jarpath -Xdt-netlet`
+FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $APEX_FLUME_JAR 
com.datatorrent.jarpath.JarPath -N $APEX_FLUME_JAR -Xdt-jarpath -Xdt-netlet`

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
 
b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
index 9db5d32..71381d7 100644
--- 
a/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
+++ 
b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
@@ -46,9 +46,9 @@ public class ZKAssistedDiscoveryTest
   public void testSerialization() throws Exception
   {
     ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
-    discovery.setServiceName("DTFlumeTest");
+    discovery.setServiceName("ApexFlumeTest");
     discovery.setConnectionString("localhost:2181");
-    discovery.setBasePath("/HelloDT");
+    discovery.setBasePath("/HelloApex");
     discovery.setup(null);
     ServiceInstance<byte[]> instance = discovery.getInstance(new 
Service<byte[]>()
     {
@@ -91,9 +91,9 @@ public class ZKAssistedDiscoveryTest
   public void testDiscover()
   {
     ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
-    discovery.setServiceName("DTFlumeTest");
+    discovery.setServiceName("ApexFlumeTest");
     discovery.setConnectionString("localhost:2181");
-    discovery.setBasePath("/HelloDT");
+    discovery.setBasePath("/HelloApex");
     discovery.setup(null);
     assertNotNull("Discovered Sinks", discovery.discover());
     discovery.teardown();
@@ -103,9 +103,9 @@ public class ZKAssistedDiscoveryTest
   public void testAdvertize()
   {
     ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
-    discovery.setServiceName("DTFlumeTest");
+    discovery.setServiceName("ApexFlumeTest");
     discovery.setConnectionString("localhost:2181");
-    discovery.setBasePath("/HelloDT");
+    discovery.setBasePath("/HelloApex");
     discovery.setup(null);
 
     Service<byte[]> service = new Service<byte[]>()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java 
b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java
deleted file mode 100644
index f97d9c0..0000000
--- a/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.flume.sink;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.apex.malhar.flume.discovery.Discovery;
-
-import org.apache.flume.channel.MemoryChannel;
-
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
-import com.datatorrent.netlet.DefaultEventLoop;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- *
- */
-public class DTFlumeSinkTest
-{
-  static final String hostname = "localhost";
-  int port = 0;
-
-  @Test
-  @SuppressWarnings("SleepWhileInLoop")
-  public void testServer() throws InterruptedException, IOException
-  {
-    Discovery<byte[]> discovery = new Discovery<byte[]>()
-    {
-      @Override
-      public synchronized void unadvertise(Service<byte[]> service)
-      {
-        notify();
-      }
-
-      @Override
-      public synchronized void advertise(Service<byte[]> service)
-      {
-        port = service.getPort();
-        logger.debug("listening at {}", service);
-        notify();
-      }
-
-      @Override
-      @SuppressWarnings("unchecked")
-      public synchronized Collection<Service<byte[]>> discover()
-      {
-        try {
-          wait();
-        } catch (InterruptedException ie) {
-          throw new RuntimeException(ie);
-        }
-        return Collections.EMPTY_LIST;
-      }
-
-    };
-    DTFlumeSink sink = new DTFlumeSink();
-    sink.setName("TeskSink");
-    sink.setHostname(hostname);
-    sink.setPort(0);
-    sink.setAcceptedTolerance(2000);
-    sink.setChannel(new MemoryChannel());
-    sink.setDiscovery(discovery);
-    sink.start();
-    AbstractLengthPrependerClient client = new AbstractLengthPrependerClient()
-    {
-      private byte[] array;
-      private int offset = 2;
-
-      @Override
-      public void onMessage(byte[] buffer, int offset, int size)
-      {
-        Slice received = new Slice(buffer, offset, size);
-        logger.debug("Client Received = {}", received);
-        Assert.assertEquals(received,
-            new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, 
Server.Request.FIXED_SIZE));
-        synchronized (DTFlumeSinkTest.this) {
-          DTFlumeSinkTest.this.notify();
-        }
-      }
-
-      @Override
-      public void connected()
-      {
-        super.connected();
-        array = new byte[Server.Request.FIXED_SIZE + offset];
-        array[offset] = Server.Command.ECHO.getOrdinal();
-        array[offset + 1] = 1;
-        array[offset + 2] = 2;
-        array[offset + 3] = 3;
-        array[offset + 4] = 4;
-        array[offset + 5] = 5;
-        array[offset + 6] = 6;
-        array[offset + 7] = 7;
-        array[offset + 8] = 8;
-        Server.writeLong(array, offset + Server.Request.TIME_OFFSET, 
System.currentTimeMillis());
-        write(array, offset, Server.Request.FIXED_SIZE);
-      }
-
-    };
-
-    DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient");
-    eventloop.start();
-    discovery.discover();
-    try {
-      eventloop.connect(new InetSocketAddress(hostname, port), client);
-      try {
-        synchronized (this) {
-          this.wait();
-        }
-      } finally {
-        eventloop.disconnect(client);
-      }
-    } finally {
-      eventloop.stop();
-    }
-
-    sink.stop();
-  }
-
-  private static final Logger logger = 
LoggerFactory.getLogger(DTFlumeSinkTest.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java 
b/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java
new file mode 100644
index 0000000..e1bc7b8
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.sink;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.flume.discovery.Discovery;
+
+import org.apache.flume.channel.MemoryChannel;
+
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ */
+public class FlumeSinkTest
+{
+  static final String hostname = "localhost";
+  int port = 0;
+
+  @Test
+  @SuppressWarnings("SleepWhileInLoop")
+  public void testServer() throws InterruptedException, IOException
+  {
+    Discovery<byte[]> discovery = new Discovery<byte[]>()
+    {
+      @Override
+      public synchronized void unadvertise(Service<byte[]> service)
+      {
+        notify();
+      }
+
+      @Override
+      public synchronized void advertise(Service<byte[]> service)
+      {
+        port = service.getPort();
+        logger.debug("listening at {}", service);
+        notify();
+      }
+
+      @Override
+      @SuppressWarnings("unchecked")
+      public synchronized Collection<Service<byte[]>> discover()
+      {
+        try {
+          wait();
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie);
+        }
+        return Collections.EMPTY_LIST;
+      }
+
+    };
+    FlumeSink sink = new FlumeSink();
+    sink.setName("TeskSink");
+    sink.setHostname(hostname);
+    sink.setPort(0);
+    sink.setAcceptedTolerance(2000);
+    sink.setChannel(new MemoryChannel());
+    sink.setDiscovery(discovery);
+    sink.start();
+    AbstractLengthPrependerClient client = new AbstractLengthPrependerClient()
+    {
+      private byte[] array;
+      private int offset = 2;
+
+      @Override
+      public void onMessage(byte[] buffer, int offset, int size)
+      {
+        Slice received = new Slice(buffer, offset, size);
+        logger.debug("Client Received = {}", received);
+        Assert.assertEquals(received,
+            new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, 
Server.Request.FIXED_SIZE));
+        synchronized (FlumeSinkTest.this) {
+          FlumeSinkTest.this.notify();
+        }
+      }
+
+      @Override
+      public void connected()
+      {
+        super.connected();
+        array = new byte[Server.Request.FIXED_SIZE + offset];
+        array[offset] = Server.Command.ECHO.getOrdinal();
+        array[offset + 1] = 1;
+        array[offset + 2] = 2;
+        array[offset + 3] = 3;
+        array[offset + 4] = 4;
+        array[offset + 5] = 5;
+        array[offset + 6] = 6;
+        array[offset + 7] = 7;
+        array[offset + 8] = 8;
+        Server.writeLong(array, offset + Server.Request.TIME_OFFSET, 
System.currentTimeMillis());
+        write(array, offset, Server.Request.FIXED_SIZE);
+      }
+
+    };
+
+    DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient");
+    eventloop.start();
+    discovery.discover();
+    try {
+      eventloop.connect(new InetSocketAddress(hostname, port), client);
+      try {
+        synchronized (this) {
+          this.wait();
+        }
+      } finally {
+        eventloop.disconnect(client);
+      }
+    } finally {
+      eventloop.stop();
+    }
+
+    sink.stop();
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FlumeSinkTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/resources/flume/conf/flume-conf.properties
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/flume/conf/flume-conf.properties 
b/flume/src/test/resources/flume/conf/flume-conf.properties
index 73dc79a..3498d67 100644
--- a/flume/src/test/resources/flume/conf/flume-conf.properties
+++ b/flume/src/test/resources/flume/conf/flume-conf.properties
@@ -63,7 +63,7 @@ agent1.sources.netcatSource.command = 
src/test/bash/subcat_periodically src/test
 
 # first sink - dt
 agent1.sinks.dt.id = CEVL00P
-agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink
+agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.FlumeSink
 agent1.sinks.dt.hostname = localhost
 agent1.sinks.dt.port = 8080
 agent1.sinks.dt.sleepMillis = 7
@@ -80,7 +80,7 @@ agent1.sinks.dt.minimumEventsPerTransaction = 1
 # Ensure that we are able to detect flume sinks (and failures) automatically.
    agent1.sinks.dt.discovery = 
org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery
    agent1.sinks.dt.discovery.connectionString = 127.0.0.1:2181
-   agent1.sinks.dt.discovery.basePath = /HelloDT
+   agent1.sinks.dt.discovery.basePath = /HelloApex
    agent1.sinks.dt.discovery.connectionTimeoutMillis = 1000
    agent1.sinks.dt.discovery.connectionRetryCount = 10
    agent1.sinks.dt.discovery.connectionRetrySleepMillis = 500

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/resources/flume/conf/flume-env.sh
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/flume/conf/flume-env.sh 
b/flume/src/test/resources/flume/conf/flume-env.sh
index 436e670..c03f98d 100644
--- a/flume/src/test/resources/flume/conf/flume-env.sh
+++ b/flume/src/test/resources/flume/conf/flume-env.sh
@@ -22,9 +22,9 @@
 # This script runs on the machine which have maven repository populated under
 # $HOME/.m2 If that's not the case, please adjust the JARPATH variable below
 # to point to colon separated list of directories where jar files can be found
-if test -z "$DT_FLUME_JAR"
+if test -z "$APEX_FLUME_JAR"
 then
-  echo [ERROR]: Environment variable DT_FLUME_JAR should point to a valid jar 
file which contains DTFlumeSink class >&2
+  echo [ERROR]: Environment variable APEX_FLUME_JAR should point to a valid 
jar file which contains FlumeSink class >&2
   exit 2
 fi
 
@@ -35,4 +35,4 @@ then
 else
   JAVA=${JAVA_HOME}/bin/java
 fi
-FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $DT_FLUME_JAR 
com.datatorrent.jarpath.JarPath -N $DT_FLUME_JAR -Xdt-jarpath -Xdt-netlet`
\ No newline at end of file
+FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $APEX_FLUME_JAR 
com.datatorrent.jarpath.JarPath -N $APEX_FLUME_JAR -Xdt-jarpath -Xdt-netlet`
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/resources/flume/conf/flume_simple.conf
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/flume/conf/flume_simple.conf 
b/flume/src/test/resources/flume/conf/flume_simple.conf
index b902881..2ed2614 100644
--- a/flume/src/test/resources/flume/conf/flume_simple.conf
+++ b/flume/src/test/resources/flume/conf/flume_simple.conf
@@ -29,7 +29,7 @@ a1.sources.r1.type = seq
 a1.sources.r1.totalEvents = 10
 
 # sink - dt
- a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink
+ a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.FlumeSink
  a1.sinks.dt.id = sink1
  a1.sinks.dt.hostname = 127.0.0.1
  a1.sinks.dt.port = 9098

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf
----------------------------------------------------------------------
diff --git a/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf 
b/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf
index 6f8932c..723805d 100644
--- a/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf
+++ b/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf
@@ -34,7 +34,7 @@ a1.sources.r1.type = seq
 a1.sources.r1.totalEvents = 10
 
 # first sink - dt
- a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink
+ a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.FlumeSink
  a1.sinks.dt.id = sink1
  a1.sinks.dt.hostname = 127.0.0.1
  a1.sinks.dt.port = 9098
@@ -48,7 +48,7 @@ a1.sources.r1.totalEvents = 10
  a1.sinks.dt.channel = c1
 
 # second sink - dt2
- a1.sinks.dt2.type = org.apache.apex.malhar.flume.sink.DTFlumeSink
+ a1.sinks.dt2.type = org.apache.apex.malhar.flume.sink.FlumeSink
  a1.sinks.dt2.id = sink2
  a1.sinks.dt2.hostname = 127.0.0.1
  a1.sinks.dt2.port = 9099

Reply via email to