Repository: apex-malhar Updated Branches: refs/heads/master c84a2c867 -> 2f70751e7
Changed DT references to Apex Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2f70751e Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2f70751e Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2f70751e Branch: refs/heads/master Commit: 2f70751e7852cc8e2be94189e1bbf8be85a19559 Parents: c48ec8c Author: Pramod Immaneni <[email protected]> Authored: Mon May 22 14:27:02 2017 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Mon May 22 16:47:34 2017 -0700 ---------------------------------------------------------------------- flume/README.md | 4 +- .../apex/malhar/flume/discovery/Discovery.java | 6 +- .../flume/discovery/ZKAssistedDiscovery.java | 4 +- .../operator/AbstractFlumeInputOperator.java | 2 +- .../apex/malhar/flume/sink/DTFlumeSink.java | 574 ------------------- .../apex/malhar/flume/sink/FlumeSink.java | 574 +++++++++++++++++++ .../flume-conf/flume-conf.sample.properties | 2 +- .../resources/flume-conf/flume-env.sample.sh | 6 +- .../discovery/ZKAssistedDiscoveryTest.java | 12 +- .../apex/malhar/flume/sink/DTFlumeSinkTest.java | 146 ----- .../apex/malhar/flume/sink/FlumeSinkTest.java | 146 +++++ .../resources/flume/conf/flume-conf.properties | 4 +- .../src/test/resources/flume/conf/flume-env.sh | 6 +- .../test/resources/flume/conf/flume_simple.conf | 2 +- .../resources/flume/conf/flume_zkdiscovery.conf | 4 +- 15 files changed, 746 insertions(+), 746 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/README.md ---------------------------------------------------------------------- diff --git a/flume/README.md b/flume/README.md index ec8fae9..5f9c320 100644 --- a/flume/README.md +++ b/flume/README.md @@ -23,7 +23,7 @@ and all the needed dependencies into `plugins.d/custom-plugin-name/libext/` (Alternatively to flume's automatic plugins.d detection, jars can be added to the FLUME_CLASSPATH using a `flume-env.sh` script. (See 'resources/flume-conf/flume-env.sample.sh') Therefore a maven repository must be available under $HOME/.m2 and the environment variable -DT_FLUME_JAR must point to the plugin JAR.) +APEX_FLUME_JAR must point to the plugin JAR.) ***Flume configuration*** A basic flume configuration can be found in `src/test/resources/flume/conf/flume_simple.conf`. @@ -31,7 +31,7 @@ A flume configuration using discovery service can be found in `src/test/resource Configuration files should be placed in flumes 'conf' directory and will be explicitly selected when running flume-ng -In the configuration file set `org.apache.apex.malhar.flume.sink.DTFlumeSink` for the **type** +In the configuration file set `org.apache.apex.malhar.flume.sink.FlumeSink` for the **type** and `org.apache.apex.malhar.flume.storage.HDFSStorage` for the **storage**, as well as a **HDFS directory** for `baseDir`. The HDFS base directory needs to be created on HDFS. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java index 619a625..c32c15b 100644 --- a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java +++ b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java @@ -21,12 +21,12 @@ package org.apache.apex.malhar.flume.discovery; import java.util.Collection; /** - * When DTFlumeSink server instance binds to the network interface, it can publish + * When FlumeSink server instance binds to the network interface, it can publish * its whereabouts by invoking advertise method on the Discovery object. Similarly * when it ceases accepting any more connections, it can publish its intent to do * so by invoking unadvertise.<p /> * Interesting parties can call discover method to get the list of addresses where - * they can find an available DTFlumeSink server instance. + * they can find an available FlumeSink server instance. * * @param <T> - Type of the objects which can be discovered * @since 0.9.3 @@ -41,7 +41,7 @@ public interface Discovery<T> void unadvertise(Service<T> service); /** - * Advertise the host/port address where DTFlumeSink is accepting a client connection. + * Advertise the host/port address where FlumeSink is accepting a client connection. * * @param service */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java index 9a7dd3c..1988d62 100644 --- a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java +++ b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java @@ -70,7 +70,7 @@ public class ZKAssistedDiscovery implements Discovery<byte[]>, public ZKAssistedDiscovery() { - this.serviceName = "DTFlume"; + this.serviceName = "ApexFlume"; this.conntectionRetrySleepMillis = 500; this.connectionRetryCount = 10; this.connectionTimeoutMillis = 1000; @@ -333,7 +333,7 @@ public class ZKAssistedDiscovery implements Discovery<byte[]>, @Override public void configure(org.apache.flume.Context context) { - serviceName = context.getString("serviceName", "DTFlume"); + serviceName = context.getString("serviceName", "ApexFlume"); connectionString = context.getString("connectionString"); basePath = context.getString("basePath"); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java index f9beb71..93b01af 100644 --- a/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java +++ b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java @@ -119,7 +119,7 @@ public abstract class AbstractFlumeInputOperator<T> public void activate(OperatorContext ctx) { if (connectionSpecs.length == 0) { - logger.info("Discovered zero DTFlumeSink"); + logger.info("Discovered zero FlumeSink"); } else if (connectionSpecs.length == 1) { for (String connectAddresse: connectionSpecs) { logger.debug("Connection spec is {}", connectAddresse); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java deleted file mode 100644 index 4f28850..0000000 --- a/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java +++ /dev/null @@ -1,574 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.flume.sink; - -import java.io.IOError; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.ServiceConfigurationError; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.apex.malhar.flume.discovery.Discovery; -import org.apache.apex.malhar.flume.sink.Server.Client; -import org.apache.apex.malhar.flume.sink.Server.Request; -import org.apache.apex.malhar.flume.storage.EventCodec; -import org.apache.apex.malhar.flume.storage.Storage; - -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.Transaction; -import org.apache.flume.conf.Configurable; -import org.apache.flume.sink.AbstractSink; - -import com.datatorrent.api.Component; -import com.datatorrent.api.StreamCodec; - -import com.datatorrent.netlet.DefaultEventLoop; -import com.datatorrent.netlet.NetletThrowable; -import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException; -import com.datatorrent.netlet.util.Slice; - -/** - * DTFlumeSink is a flume sink developed to ingest the data into DataTorrent DAG - * from flume. It's essentially a flume sink which acts as a server capable of - * talking to one client at a time. The client for this server is AbstractFlumeInputOperator. - * <p /> - * <experimental>DTFlumeSink auto adjusts the rate at which it consumes the data from channel to - * match the throughput of the DAG.</experimental> - * <p /> - * The properties you can set on the DTFlumeSink are: <br /> - * id - string unique value identifying this sink <br /> - * hostname - string value indicating the fqdn or ip address of the interface on which the server should listen <br /> - * port - integer value indicating the numeric port to which the server should bind <br /> - * sleepMillis - integer value indicating the number of milliseconds the process should sleep when there are no events - * before checking for next event again <br /> - * throughputAdjustmentPercent - integer value indicating by what percentage the flume transaction size should be - * adjusted upward or downward at a time <br /> - * minimumEventsPerTransaction - integer value indicating the minimum number of events per transaction <br /> - * maximumEventsPerTransaction - integer value indicating the maximum number of events per transaction. This value can - * not be more than channel's transaction capacity.<br /> - * - * @since 0.9.2 - */ -public class DTFlumeSink extends AbstractSink implements Configurable -{ - private static final String HOSTNAME_STRING = "hostname"; - private static final String HOSTNAME_DEFAULT = "locahost"; - private static final long ACCEPTED_TOLERANCE = 20000; - private DefaultEventLoop eventloop; - private Server server; - private int outstandingEventsCount; - private int lastConsumedEventsCount; - private int idleCount; - private byte[] playback; - private Client client; - private String hostname; - private int port; - private String id; - private long acceptedTolerance; - private long sleepMillis; - private double throughputAdjustmentFactor; - private int minimumEventsPerTransaction; - private int maximumEventsPerTransaction; - private long commitEventTimeoutMillis; - private transient long lastCommitEventTimeMillis; - private Storage storage; - Discovery<byte[]> discovery; - StreamCodec<Event> codec; - /* Begin implementing Flume Sink interface */ - - @Override - @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch", "UseSpecificCatch", "SleepWhileInLoop"}) - public Status process() throws EventDeliveryException - { - Slice slice; - synchronized (server.requests) { - for (Request r : server.requests) { - logger.debug("found {}", r); - switch (r.type) { - case SEEK: - lastCommitEventTimeMillis = System.currentTimeMillis(); - slice = r.getAddress(); - playback = storage.retrieve(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length)); - client = r.client; - break; - - case COMMITTED: - lastCommitEventTimeMillis = System.currentTimeMillis(); - slice = r.getAddress(); - storage.clean(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length)); - break; - - case CONNECTED: - logger.debug("Connected received, ignoring it!"); - break; - - case DISCONNECTED: - if (r.client == client) { - client = null; - outstandingEventsCount = 0; - } - break; - - case WINDOWED: - lastConsumedEventsCount = r.getEventCount(); - idleCount = r.getIdleCount(); - outstandingEventsCount -= lastConsumedEventsCount; - break; - - case SERVER_ERROR: - throw new IOError(null); - - default: - logger.debug("Cannot understand the request {}", r); - break; - } - } - - server.requests.clear(); - } - - if (client == null) { - logger.info("No client expressed interest yet to consume the events."); - return Status.BACKOFF; - } else if (System.currentTimeMillis() - lastCommitEventTimeMillis > commitEventTimeoutMillis) { - logger.info("Client has not processed the workload given for the last {} milliseconds, so backing off.", - System.currentTimeMillis() - lastCommitEventTimeMillis); - return Status.BACKOFF; - } - - int maxTuples; - // the following logic needs to be fixed... this is a quick put together. - if (outstandingEventsCount < 0) { - if (idleCount > 1) { - maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount); - } else { - maxTuples = (int)((1 + throughputAdjustmentFactor) * lastConsumedEventsCount); - } - } else if (outstandingEventsCount > lastConsumedEventsCount) { - maxTuples = (int)((1 - throughputAdjustmentFactor) * lastConsumedEventsCount); - } else { - if (idleCount > 0) { - maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount); - if (maxTuples <= 0) { - maxTuples = minimumEventsPerTransaction; - } - } else { - maxTuples = lastConsumedEventsCount; - } - } - - if (maxTuples >= maximumEventsPerTransaction) { - maxTuples = maximumEventsPerTransaction; - } else if (maxTuples <= 0) { - maxTuples = minimumEventsPerTransaction; - } - - if (maxTuples > 0) { - if (playback != null) { - try { - int i = 0; - do { - if (!client.write(playback)) { - retryWrite(playback, null); - } - outstandingEventsCount++; - playback = storage.retrieveNext(); - } - while (++i < maxTuples && playback != null); - } catch (Exception ex) { - logger.warn("Playback Failed", ex); - if (ex instanceof NetletThrowable) { - try { - eventloop.disconnect(client); - } finally { - client = null; - outstandingEventsCount = 0; - } - } - return Status.BACKOFF; - } - } else { - int storedTuples = 0; - - Transaction t = getChannel().getTransaction(); - try { - t.begin(); - - Event e; - while (storedTuples < maxTuples && (e = getChannel().take()) != null) { - Slice event = codec.toByteArray(e); - byte[] address = storage.store(event); - if (address != null) { - if (!client.write(address, event)) { - retryWrite(address, event); - } - outstandingEventsCount++; - } else { - logger.debug("Detected the condition of recovery from flume crash!"); - } - storedTuples++; - } - - if (storedTuples > 0) { - storage.flush(); - } - - t.commit(); - - if (storedTuples > 0) { /* log less frequently */ - logger.debug("Transaction details maxTuples = {}, storedTuples = {}, outstanding = {}", - maxTuples, storedTuples, outstandingEventsCount); - } - } catch (Error er) { - t.rollback(); - throw er; - } catch (Exception ex) { - logger.error("Transaction Failed", ex); - if (ex instanceof NetletRuntimeException && client != null) { - try { - eventloop.disconnect(client); - } finally { - client = null; - outstandingEventsCount = 0; - } - } - t.rollback(); - return Status.BACKOFF; - } finally { - t.close(); - } - - if (storedTuples == 0) { - sleep(); - } - } - } - - return Status.READY; - } - - private void sleep() - { - try { - Thread.sleep(sleepMillis); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - - @Override - public void start() - { - try { - if (storage instanceof Component) { - @SuppressWarnings("unchecked") - Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage; - component.setup(null); - } - if (discovery instanceof Component) { - @SuppressWarnings("unchecked") - Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery; - component.setup(null); - } - if (codec instanceof Component) { - @SuppressWarnings("unchecked") - Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec; - component.setup(null); - } - eventloop = new DefaultEventLoop("EventLoop-" + id); - server = new Server(id, discovery,acceptedTolerance); - } catch (Error error) { - throw error; - } catch (RuntimeException re) { - throw re; - } catch (IOException ex) { - throw new RuntimeException(ex); - } - - eventloop.start(); - eventloop.start(hostname, port, server); - super.start(); - } - - @Override - public void stop() - { - try { - super.stop(); - } finally { - try { - if (client != null) { - eventloop.disconnect(client); - client = null; - } - - eventloop.stop(server); - eventloop.stop(); - - if (codec instanceof Component) { - @SuppressWarnings("unchecked") - Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec; - component.teardown(); - } - if (discovery instanceof Component) { - @SuppressWarnings("unchecked") - Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery; - component.teardown(); - } - if (storage instanceof Component) { - @SuppressWarnings("unchecked") - Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage; - component.teardown(); - } - } catch (Throwable cause) { - throw new ServiceConfigurationError("Failed Stop", cause); - } - } - } - - /* End implementing Flume Sink interface */ - - /* Begin Configurable Interface */ - @Override - public void configure(Context context) - { - hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT); - port = context.getInteger("port", 0); - id = context.getString("id"); - if (id == null) { - id = getName(); - } - acceptedTolerance = context.getLong("acceptedTolerance", ACCEPTED_TOLERANCE); - sleepMillis = context.getLong("sleepMillis", 5L); - throughputAdjustmentFactor = context.getInteger("throughputAdjustmentPercent", 5) / 100.0; - maximumEventsPerTransaction = context.getInteger("maximumEventsPerTransaction", 10000); - minimumEventsPerTransaction = context.getInteger("minimumEventsPerTransaction", 100); - commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", Long.MAX_VALUE); - - @SuppressWarnings("unchecked") - Discovery<byte[]> ldiscovery = configure("discovery", Discovery.class, context); - if (ldiscovery == null) { - logger.warn("Discovery agent not configured for the sink!"); - discovery = new Discovery<byte[]>() - { - @Override - public void unadvertise(Service<byte[]> service) - { - logger.debug("Sink {} stopped listening on {}:{}", service.getId(), service.getHost(), service.getPort()); - } - - @Override - public void advertise(Service<byte[]> service) - { - logger.debug("Sink {} started listening on {}:{}", service.getId(), service.getHost(), service.getPort()); - } - - @Override - @SuppressWarnings("unchecked") - public Collection<Service<byte[]>> discover() - { - return Collections.EMPTY_SET; - } - - }; - } else { - discovery = ldiscovery; - } - - storage = configure("storage", Storage.class, context); - if (storage == null) { - logger.warn("storage key missing... DTFlumeSink may lose data!"); - storage = new Storage() - { - @Override - public byte[] store(Slice slice) - { - return null; - } - - @Override - public byte[] retrieve(byte[] identifier) - { - return null; - } - - @Override - public byte[] retrieveNext() - { - return null; - } - - @Override - public void clean(byte[] identifier) - { - } - - @Override - public void flush() - { - } - - }; - } - - @SuppressWarnings("unchecked") - StreamCodec<Event> lCodec = configure("codec", StreamCodec.class, context); - if (lCodec == null) { - codec = new EventCodec(); - } else { - codec = lCodec; - } - - } - - /* End Configurable Interface */ - - @SuppressWarnings({"UseSpecificCatch", "BroadCatchBlock", "TooBroadCatch"}) - private static <T> T configure(String key, Class<T> clazz, Context context) - { - String classname = context.getString(key); - if (classname == null) { - return null; - } - - try { - Class<?> loadClass = Thread.currentThread().getContextClassLoader().loadClass(classname); - if (clazz.isAssignableFrom(loadClass)) { - @SuppressWarnings("unchecked") - T object = (T)loadClass.newInstance(); - if (object instanceof Configurable) { - Context context1 = new Context(context.getSubProperties(key + '.')); - String id = context1.getString(Storage.ID); - if (id == null) { - id = context.getString(Storage.ID); - logger.debug("{} inherited id={} from sink", key, id); - context1.put(Storage.ID, id); - } - ((Configurable)object).configure(context1); - } - - return object; - } else { - logger.error("key class {} does not implement {} interface", classname, Storage.class.getCanonicalName()); - throw new Error("Invalid storage " + classname); - } - } catch (Error error) { - throw error; - } catch (RuntimeException re) { - throw re; - } catch (Throwable t) { - throw new RuntimeException(t); - } - } - - /** - * @return the hostname - */ - String getHostname() - { - return hostname; - } - - /** - * @param hostname the hostname to set - */ - void setHostname(String hostname) - { - this.hostname = hostname; - } - - /** - * @return the port - */ - int getPort() - { - return port; - } - - public long getAcceptedTolerance() - { - return acceptedTolerance; - } - - public void setAcceptedTolerance(long acceptedTolerance) - { - this.acceptedTolerance = acceptedTolerance; - } - - /** - * @param port the port to set - */ - void setPort(int port) - { - this.port = port; - } - - /** - * @return the discovery - */ - Discovery<byte[]> getDiscovery() - { - return discovery; - } - - /** - * @param discovery the discovery to set - */ - void setDiscovery(Discovery<byte[]> discovery) - { - this.discovery = discovery; - } - - /** - * Attempt the sequence of writing after sleeping twice and upon failure assume - * that the client connection has problems and hence close it. - * - * @param address - * @param e - * @throws IOException - */ - private void retryWrite(byte[] address, Slice event) throws IOException - { - if (event == null) { /* this happens for playback where address and event are sent as single object */ - while (client.isConnected()) { - sleep(); - if (client.write(address)) { - return; - } - } - } else { /* this happens when the events are taken from the flume channel and writing first time failed */ - while (client.isConnected()) { - sleep(); - if (client.write(address, event)) { - return; - } - } - } - - throw new IOException("Client disconnected!"); - } - - private static final Logger logger = LoggerFactory.getLogger(DTFlumeSink.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/java/org/apache/apex/malhar/flume/sink/FlumeSink.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/sink/FlumeSink.java b/flume/src/main/java/org/apache/apex/malhar/flume/sink/FlumeSink.java new file mode 100644 index 0000000..99cc1d5 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/sink/FlumeSink.java @@ -0,0 +1,574 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.sink; + +import java.io.IOError; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.ServiceConfigurationError; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.flume.discovery.Discovery; +import org.apache.apex.malhar.flume.sink.Server.Client; +import org.apache.apex.malhar.flume.sink.Server.Request; +import org.apache.apex.malhar.flume.storage.EventCodec; +import org.apache.apex.malhar.flume.storage.Storage; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurable; +import org.apache.flume.sink.AbstractSink; + +import com.datatorrent.api.Component; +import com.datatorrent.api.StreamCodec; + +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.NetletThrowable; +import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException; +import com.datatorrent.netlet.util.Slice; + +/** + * FlumeSink is a flume sink developed to ingest the data into DataTorrent DAG + * from flume. It's essentially a flume sink which acts as a server capable of + * talking to one client at a time. The client for this server is AbstractFlumeInputOperator. + * <p /> + * <experimental>FlumeSink auto adjusts the rate at which it consumes the data from channel to + * match the throughput of the DAG.</experimental> + * <p /> + * The properties you can set on the FlumeSink are: <br /> + * id - string unique value identifying this sink <br /> + * hostname - string value indicating the fqdn or ip address of the interface on which the server should listen <br /> + * port - integer value indicating the numeric port to which the server should bind <br /> + * sleepMillis - integer value indicating the number of milliseconds the process should sleep when there are no events + * before checking for next event again <br /> + * throughputAdjustmentPercent - integer value indicating by what percentage the flume transaction size should be + * adjusted upward or downward at a time <br /> + * minimumEventsPerTransaction - integer value indicating the minimum number of events per transaction <br /> + * maximumEventsPerTransaction - integer value indicating the maximum number of events per transaction. This value can + * not be more than channel's transaction capacity.<br /> + * + * @since 0.9.2 + */ +public class FlumeSink extends AbstractSink implements Configurable +{ + private static final String HOSTNAME_STRING = "hostname"; + private static final String HOSTNAME_DEFAULT = "locahost"; + private static final long ACCEPTED_TOLERANCE = 20000; + private DefaultEventLoop eventloop; + private Server server; + private int outstandingEventsCount; + private int lastConsumedEventsCount; + private int idleCount; + private byte[] playback; + private Client client; + private String hostname; + private int port; + private String id; + private long acceptedTolerance; + private long sleepMillis; + private double throughputAdjustmentFactor; + private int minimumEventsPerTransaction; + private int maximumEventsPerTransaction; + private long commitEventTimeoutMillis; + private transient long lastCommitEventTimeMillis; + private Storage storage; + Discovery<byte[]> discovery; + StreamCodec<Event> codec; + /* Begin implementing Flume Sink interface */ + + @Override + @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch", "UseSpecificCatch", "SleepWhileInLoop"}) + public Status process() throws EventDeliveryException + { + Slice slice; + synchronized (server.requests) { + for (Request r : server.requests) { + logger.debug("found {}", r); + switch (r.type) { + case SEEK: + lastCommitEventTimeMillis = System.currentTimeMillis(); + slice = r.getAddress(); + playback = storage.retrieve(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length)); + client = r.client; + break; + + case COMMITTED: + lastCommitEventTimeMillis = System.currentTimeMillis(); + slice = r.getAddress(); + storage.clean(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length)); + break; + + case CONNECTED: + logger.debug("Connected received, ignoring it!"); + break; + + case DISCONNECTED: + if (r.client == client) { + client = null; + outstandingEventsCount = 0; + } + break; + + case WINDOWED: + lastConsumedEventsCount = r.getEventCount(); + idleCount = r.getIdleCount(); + outstandingEventsCount -= lastConsumedEventsCount; + break; + + case SERVER_ERROR: + throw new IOError(null); + + default: + logger.debug("Cannot understand the request {}", r); + break; + } + } + + server.requests.clear(); + } + + if (client == null) { + logger.info("No client expressed interest yet to consume the events."); + return Status.BACKOFF; + } else if (System.currentTimeMillis() - lastCommitEventTimeMillis > commitEventTimeoutMillis) { + logger.info("Client has not processed the workload given for the last {} milliseconds, so backing off.", + System.currentTimeMillis() - lastCommitEventTimeMillis); + return Status.BACKOFF; + } + + int maxTuples; + // the following logic needs to be fixed... this is a quick put together. + if (outstandingEventsCount < 0) { + if (idleCount > 1) { + maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount); + } else { + maxTuples = (int)((1 + throughputAdjustmentFactor) * lastConsumedEventsCount); + } + } else if (outstandingEventsCount > lastConsumedEventsCount) { + maxTuples = (int)((1 - throughputAdjustmentFactor) * lastConsumedEventsCount); + } else { + if (idleCount > 0) { + maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount); + if (maxTuples <= 0) { + maxTuples = minimumEventsPerTransaction; + } + } else { + maxTuples = lastConsumedEventsCount; + } + } + + if (maxTuples >= maximumEventsPerTransaction) { + maxTuples = maximumEventsPerTransaction; + } else if (maxTuples <= 0) { + maxTuples = minimumEventsPerTransaction; + } + + if (maxTuples > 0) { + if (playback != null) { + try { + int i = 0; + do { + if (!client.write(playback)) { + retryWrite(playback, null); + } + outstandingEventsCount++; + playback = storage.retrieveNext(); + } + while (++i < maxTuples && playback != null); + } catch (Exception ex) { + logger.warn("Playback Failed", ex); + if (ex instanceof NetletThrowable) { + try { + eventloop.disconnect(client); + } finally { + client = null; + outstandingEventsCount = 0; + } + } + return Status.BACKOFF; + } + } else { + int storedTuples = 0; + + Transaction t = getChannel().getTransaction(); + try { + t.begin(); + + Event e; + while (storedTuples < maxTuples && (e = getChannel().take()) != null) { + Slice event = codec.toByteArray(e); + byte[] address = storage.store(event); + if (address != null) { + if (!client.write(address, event)) { + retryWrite(address, event); + } + outstandingEventsCount++; + } else { + logger.debug("Detected the condition of recovery from flume crash!"); + } + storedTuples++; + } + + if (storedTuples > 0) { + storage.flush(); + } + + t.commit(); + + if (storedTuples > 0) { /* log less frequently */ + logger.debug("Transaction details maxTuples = {}, storedTuples = {}, outstanding = {}", + maxTuples, storedTuples, outstandingEventsCount); + } + } catch (Error er) { + t.rollback(); + throw er; + } catch (Exception ex) { + logger.error("Transaction Failed", ex); + if (ex instanceof NetletRuntimeException && client != null) { + try { + eventloop.disconnect(client); + } finally { + client = null; + outstandingEventsCount = 0; + } + } + t.rollback(); + return Status.BACKOFF; + } finally { + t.close(); + } + + if (storedTuples == 0) { + sleep(); + } + } + } + + return Status.READY; + } + + private void sleep() + { + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void start() + { + try { + if (storage instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage; + component.setup(null); + } + if (discovery instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery; + component.setup(null); + } + if (codec instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec; + component.setup(null); + } + eventloop = new DefaultEventLoop("EventLoop-" + id); + server = new Server(id, discovery,acceptedTolerance); + } catch (Error error) { + throw error; + } catch (RuntimeException re) { + throw re; + } catch (IOException ex) { + throw new RuntimeException(ex); + } + + eventloop.start(); + eventloop.start(hostname, port, server); + super.start(); + } + + @Override + public void stop() + { + try { + super.stop(); + } finally { + try { + if (client != null) { + eventloop.disconnect(client); + client = null; + } + + eventloop.stop(server); + eventloop.stop(); + + if (codec instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec; + component.teardown(); + } + if (discovery instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery; + component.teardown(); + } + if (storage instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage; + component.teardown(); + } + } catch (Throwable cause) { + throw new ServiceConfigurationError("Failed Stop", cause); + } + } + } + + /* End implementing Flume Sink interface */ + + /* Begin Configurable Interface */ + @Override + public void configure(Context context) + { + hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT); + port = context.getInteger("port", 0); + id = context.getString("id"); + if (id == null) { + id = getName(); + } + acceptedTolerance = context.getLong("acceptedTolerance", ACCEPTED_TOLERANCE); + sleepMillis = context.getLong("sleepMillis", 5L); + throughputAdjustmentFactor = context.getInteger("throughputAdjustmentPercent", 5) / 100.0; + maximumEventsPerTransaction = context.getInteger("maximumEventsPerTransaction", 10000); + minimumEventsPerTransaction = context.getInteger("minimumEventsPerTransaction", 100); + commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", Long.MAX_VALUE); + + @SuppressWarnings("unchecked") + Discovery<byte[]> ldiscovery = configure("discovery", Discovery.class, context); + if (ldiscovery == null) { + logger.warn("Discovery agent not configured for the sink!"); + discovery = new Discovery<byte[]>() + { + @Override + public void unadvertise(Service<byte[]> service) + { + logger.debug("Sink {} stopped listening on {}:{}", service.getId(), service.getHost(), service.getPort()); + } + + @Override + public void advertise(Service<byte[]> service) + { + logger.debug("Sink {} started listening on {}:{}", service.getId(), service.getHost(), service.getPort()); + } + + @Override + @SuppressWarnings("unchecked") + public Collection<Service<byte[]>> discover() + { + return Collections.EMPTY_SET; + } + + }; + } else { + discovery = ldiscovery; + } + + storage = configure("storage", Storage.class, context); + if (storage == null) { + logger.warn("storage key missing... FlumeSink may lose data!"); + storage = new Storage() + { + @Override + public byte[] store(Slice slice) + { + return null; + } + + @Override + public byte[] retrieve(byte[] identifier) + { + return null; + } + + @Override + public byte[] retrieveNext() + { + return null; + } + + @Override + public void clean(byte[] identifier) + { + } + + @Override + public void flush() + { + } + + }; + } + + @SuppressWarnings("unchecked") + StreamCodec<Event> lCodec = configure("codec", StreamCodec.class, context); + if (lCodec == null) { + codec = new EventCodec(); + } else { + codec = lCodec; + } + + } + + /* End Configurable Interface */ + + @SuppressWarnings({"UseSpecificCatch", "BroadCatchBlock", "TooBroadCatch"}) + private static <T> T configure(String key, Class<T> clazz, Context context) + { + String classname = context.getString(key); + if (classname == null) { + return null; + } + + try { + Class<?> loadClass = Thread.currentThread().getContextClassLoader().loadClass(classname); + if (clazz.isAssignableFrom(loadClass)) { + @SuppressWarnings("unchecked") + T object = (T)loadClass.newInstance(); + if (object instanceof Configurable) { + Context context1 = new Context(context.getSubProperties(key + '.')); + String id = context1.getString(Storage.ID); + if (id == null) { + id = context.getString(Storage.ID); + logger.debug("{} inherited id={} from sink", key, id); + context1.put(Storage.ID, id); + } + ((Configurable)object).configure(context1); + } + + return object; + } else { + logger.error("key class {} does not implement {} interface", classname, Storage.class.getCanonicalName()); + throw new Error("Invalid storage " + classname); + } + } catch (Error error) { + throw error; + } catch (RuntimeException re) { + throw re; + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + /** + * @return the hostname + */ + String getHostname() + { + return hostname; + } + + /** + * @param hostname the hostname to set + */ + void setHostname(String hostname) + { + this.hostname = hostname; + } + + /** + * @return the port + */ + int getPort() + { + return port; + } + + public long getAcceptedTolerance() + { + return acceptedTolerance; + } + + public void setAcceptedTolerance(long acceptedTolerance) + { + this.acceptedTolerance = acceptedTolerance; + } + + /** + * @param port the port to set + */ + void setPort(int port) + { + this.port = port; + } + + /** + * @return the discovery + */ + Discovery<byte[]> getDiscovery() + { + return discovery; + } + + /** + * @param discovery the discovery to set + */ + void setDiscovery(Discovery<byte[]> discovery) + { + this.discovery = discovery; + } + + /** + * Attempt the sequence of writing after sleeping twice and upon failure assume + * that the client connection has problems and hence close it. + * + * @param address + * @param e + * @throws IOException + */ + private void retryWrite(byte[] address, Slice event) throws IOException + { + if (event == null) { /* this happens for playback where address and event are sent as single object */ + while (client.isConnected()) { + sleep(); + if (client.write(address)) { + return; + } + } + } else { /* this happens when the events are taken from the flume channel and writing first time failed */ + while (client.isConnected()) { + sleep(); + if (client.write(address, event)) { + return; + } + } + } + + throw new IOException("Client disconnected!"); + } + + private static final Logger logger = LoggerFactory.getLogger(FlumeSink.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/resources/flume-conf/flume-conf.sample.properties ---------------------------------------------------------------------- diff --git a/flume/src/main/resources/flume-conf/flume-conf.sample.properties b/flume/src/main/resources/flume-conf/flume-conf.sample.properties index af59e52..9504441 100644 --- a/flume/src/main/resources/flume-conf/flume-conf.sample.properties +++ b/flume/src/main/resources/flume-conf/flume-conf.sample.properties @@ -23,7 +23,7 @@ agent1.sinks = dt # first sink - dt - agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink + agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.FlumeSink agent1.sinks.dt.id = sink1 agent1.sinks.dt.hostname = localhost agent1.sinks.dt.port = 8080 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/main/resources/flume-conf/flume-env.sample.sh ---------------------------------------------------------------------- diff --git a/flume/src/main/resources/flume-conf/flume-env.sample.sh b/flume/src/main/resources/flume-conf/flume-env.sample.sh index 570411b..41d093f 100644 --- a/flume/src/main/resources/flume-conf/flume-env.sample.sh +++ b/flume/src/main/resources/flume-conf/flume-env.sample.sh @@ -22,9 +22,9 @@ # This script runs on the machine which have maven repository populated under # $HOME/.m2 If that's not the case, please adjust the JARPATH variable below # to point to colon separated list of directories where jar files can be found -if test -z "$DT_FLUME_JAR" +if test -z "$APEX_FLUME_JAR" then - echo [ERROR]: Environment variable DT_FLUME_JAR should point to a valid jar file which contains DTFlumeSink class >&2 + echo [ERROR]: Environment variable APEX_FLUME_JAR should point to a valid jar file which contains FlumeSink class >&2 exit 2 fi @@ -35,4 +35,4 @@ then else JAVA=${JAVA_HOME}/bin/java fi -FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $DT_FLUME_JAR com.datatorrent.jarpath.JarPath -N $DT_FLUME_JAR -Xdt-jarpath -Xdt-netlet` +FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $APEX_FLUME_JAR com.datatorrent.jarpath.JarPath -N $APEX_FLUME_JAR -Xdt-jarpath -Xdt-netlet` http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java index 9db5d32..71381d7 100644 --- a/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java +++ b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java @@ -46,9 +46,9 @@ public class ZKAssistedDiscoveryTest public void testSerialization() throws Exception { ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); - discovery.setServiceName("DTFlumeTest"); + discovery.setServiceName("ApexFlumeTest"); discovery.setConnectionString("localhost:2181"); - discovery.setBasePath("/HelloDT"); + discovery.setBasePath("/HelloApex"); discovery.setup(null); ServiceInstance<byte[]> instance = discovery.getInstance(new Service<byte[]>() { @@ -91,9 +91,9 @@ public class ZKAssistedDiscoveryTest public void testDiscover() { ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); - discovery.setServiceName("DTFlumeTest"); + discovery.setServiceName("ApexFlumeTest"); discovery.setConnectionString("localhost:2181"); - discovery.setBasePath("/HelloDT"); + discovery.setBasePath("/HelloApex"); discovery.setup(null); assertNotNull("Discovered Sinks", discovery.discover()); discovery.teardown(); @@ -103,9 +103,9 @@ public class ZKAssistedDiscoveryTest public void testAdvertize() { ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); - discovery.setServiceName("DTFlumeTest"); + discovery.setServiceName("ApexFlumeTest"); discovery.setConnectionString("localhost:2181"); - discovery.setBasePath("/HelloDT"); + discovery.setBasePath("/HelloApex"); discovery.setup(null); Service<byte[]> service = new Service<byte[]>() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java deleted file mode 100644 index f97d9c0..0000000 --- a/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.flume.sink; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.apex.malhar.flume.discovery.Discovery; - -import org.apache.flume.channel.MemoryChannel; - -import com.datatorrent.netlet.AbstractLengthPrependerClient; -import com.datatorrent.netlet.DefaultEventLoop; -import com.datatorrent.netlet.util.Slice; - -/** - * - */ -public class DTFlumeSinkTest -{ - static final String hostname = "localhost"; - int port = 0; - - @Test - @SuppressWarnings("SleepWhileInLoop") - public void testServer() throws InterruptedException, IOException - { - Discovery<byte[]> discovery = new Discovery<byte[]>() - { - @Override - public synchronized void unadvertise(Service<byte[]> service) - { - notify(); - } - - @Override - public synchronized void advertise(Service<byte[]> service) - { - port = service.getPort(); - logger.debug("listening at {}", service); - notify(); - } - - @Override - @SuppressWarnings("unchecked") - public synchronized Collection<Service<byte[]>> discover() - { - try { - wait(); - } catch (InterruptedException ie) { - throw new RuntimeException(ie); - } - return Collections.EMPTY_LIST; - } - - }; - DTFlumeSink sink = new DTFlumeSink(); - sink.setName("TeskSink"); - sink.setHostname(hostname); - sink.setPort(0); - sink.setAcceptedTolerance(2000); - sink.setChannel(new MemoryChannel()); - sink.setDiscovery(discovery); - sink.start(); - AbstractLengthPrependerClient client = new AbstractLengthPrependerClient() - { - private byte[] array; - private int offset = 2; - - @Override - public void onMessage(byte[] buffer, int offset, int size) - { - Slice received = new Slice(buffer, offset, size); - logger.debug("Client Received = {}", received); - Assert.assertEquals(received, - new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, Server.Request.FIXED_SIZE)); - synchronized (DTFlumeSinkTest.this) { - DTFlumeSinkTest.this.notify(); - } - } - - @Override - public void connected() - { - super.connected(); - array = new byte[Server.Request.FIXED_SIZE + offset]; - array[offset] = Server.Command.ECHO.getOrdinal(); - array[offset + 1] = 1; - array[offset + 2] = 2; - array[offset + 3] = 3; - array[offset + 4] = 4; - array[offset + 5] = 5; - array[offset + 6] = 6; - array[offset + 7] = 7; - array[offset + 8] = 8; - Server.writeLong(array, offset + Server.Request.TIME_OFFSET, System.currentTimeMillis()); - write(array, offset, Server.Request.FIXED_SIZE); - } - - }; - - DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient"); - eventloop.start(); - discovery.discover(); - try { - eventloop.connect(new InetSocketAddress(hostname, port), client); - try { - synchronized (this) { - this.wait(); - } - } finally { - eventloop.disconnect(client); - } - } finally { - eventloop.stop(); - } - - sink.stop(); - } - - private static final Logger logger = LoggerFactory.getLogger(DTFlumeSinkTest.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java new file mode 100644 index 0000000..e1bc7b8 --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.sink; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.flume.discovery.Discovery; + +import org.apache.flume.channel.MemoryChannel; + +import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.util.Slice; + +/** + * + */ +public class FlumeSinkTest +{ + static final String hostname = "localhost"; + int port = 0; + + @Test + @SuppressWarnings("SleepWhileInLoop") + public void testServer() throws InterruptedException, IOException + { + Discovery<byte[]> discovery = new Discovery<byte[]>() + { + @Override + public synchronized void unadvertise(Service<byte[]> service) + { + notify(); + } + + @Override + public synchronized void advertise(Service<byte[]> service) + { + port = service.getPort(); + logger.debug("listening at {}", service); + notify(); + } + + @Override + @SuppressWarnings("unchecked") + public synchronized Collection<Service<byte[]>> discover() + { + try { + wait(); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + return Collections.EMPTY_LIST; + } + + }; + FlumeSink sink = new FlumeSink(); + sink.setName("TeskSink"); + sink.setHostname(hostname); + sink.setPort(0); + sink.setAcceptedTolerance(2000); + sink.setChannel(new MemoryChannel()); + sink.setDiscovery(discovery); + sink.start(); + AbstractLengthPrependerClient client = new AbstractLengthPrependerClient() + { + private byte[] array; + private int offset = 2; + + @Override + public void onMessage(byte[] buffer, int offset, int size) + { + Slice received = new Slice(buffer, offset, size); + logger.debug("Client Received = {}", received); + Assert.assertEquals(received, + new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, Server.Request.FIXED_SIZE)); + synchronized (FlumeSinkTest.this) { + FlumeSinkTest.this.notify(); + } + } + + @Override + public void connected() + { + super.connected(); + array = new byte[Server.Request.FIXED_SIZE + offset]; + array[offset] = Server.Command.ECHO.getOrdinal(); + array[offset + 1] = 1; + array[offset + 2] = 2; + array[offset + 3] = 3; + array[offset + 4] = 4; + array[offset + 5] = 5; + array[offset + 6] = 6; + array[offset + 7] = 7; + array[offset + 8] = 8; + Server.writeLong(array, offset + Server.Request.TIME_OFFSET, System.currentTimeMillis()); + write(array, offset, Server.Request.FIXED_SIZE); + } + + }; + + DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient"); + eventloop.start(); + discovery.discover(); + try { + eventloop.connect(new InetSocketAddress(hostname, port), client); + try { + synchronized (this) { + this.wait(); + } + } finally { + eventloop.disconnect(client); + } + } finally { + eventloop.stop(); + } + + sink.stop(); + } + + private static final Logger logger = LoggerFactory.getLogger(FlumeSinkTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/resources/flume/conf/flume-conf.properties ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/flume/conf/flume-conf.properties b/flume/src/test/resources/flume/conf/flume-conf.properties index 73dc79a..3498d67 100644 --- a/flume/src/test/resources/flume/conf/flume-conf.properties +++ b/flume/src/test/resources/flume/conf/flume-conf.properties @@ -63,7 +63,7 @@ agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test # first sink - dt agent1.sinks.dt.id = CEVL00P -agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink +agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.FlumeSink agent1.sinks.dt.hostname = localhost agent1.sinks.dt.port = 8080 agent1.sinks.dt.sleepMillis = 7 @@ -80,7 +80,7 @@ agent1.sinks.dt.minimumEventsPerTransaction = 1 # Ensure that we are able to detect flume sinks (and failures) automatically. agent1.sinks.dt.discovery = org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery agent1.sinks.dt.discovery.connectionString = 127.0.0.1:2181 - agent1.sinks.dt.discovery.basePath = /HelloDT + agent1.sinks.dt.discovery.basePath = /HelloApex agent1.sinks.dt.discovery.connectionTimeoutMillis = 1000 agent1.sinks.dt.discovery.connectionRetryCount = 10 agent1.sinks.dt.discovery.connectionRetrySleepMillis = 500 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/resources/flume/conf/flume-env.sh ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/flume/conf/flume-env.sh b/flume/src/test/resources/flume/conf/flume-env.sh index 436e670..c03f98d 100644 --- a/flume/src/test/resources/flume/conf/flume-env.sh +++ b/flume/src/test/resources/flume/conf/flume-env.sh @@ -22,9 +22,9 @@ # This script runs on the machine which have maven repository populated under # $HOME/.m2 If that's not the case, please adjust the JARPATH variable below # to point to colon separated list of directories where jar files can be found -if test -z "$DT_FLUME_JAR" +if test -z "$APEX_FLUME_JAR" then - echo [ERROR]: Environment variable DT_FLUME_JAR should point to a valid jar file which contains DTFlumeSink class >&2 + echo [ERROR]: Environment variable APEX_FLUME_JAR should point to a valid jar file which contains FlumeSink class >&2 exit 2 fi @@ -35,4 +35,4 @@ then else JAVA=${JAVA_HOME}/bin/java fi -FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $DT_FLUME_JAR com.datatorrent.jarpath.JarPath -N $DT_FLUME_JAR -Xdt-jarpath -Xdt-netlet` \ No newline at end of file +FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $APEX_FLUME_JAR com.datatorrent.jarpath.JarPath -N $APEX_FLUME_JAR -Xdt-jarpath -Xdt-netlet` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/resources/flume/conf/flume_simple.conf ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/flume/conf/flume_simple.conf b/flume/src/test/resources/flume/conf/flume_simple.conf index b902881..2ed2614 100644 --- a/flume/src/test/resources/flume/conf/flume_simple.conf +++ b/flume/src/test/resources/flume/conf/flume_simple.conf @@ -29,7 +29,7 @@ a1.sources.r1.type = seq a1.sources.r1.totalEvents = 10 # sink - dt - a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink + a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.FlumeSink a1.sinks.dt.id = sink1 a1.sinks.dt.hostname = 127.0.0.1 a1.sinks.dt.port = 9098 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f70751e/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf b/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf index 6f8932c..723805d 100644 --- a/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf +++ b/flume/src/test/resources/flume/conf/flume_zkdiscovery.conf @@ -34,7 +34,7 @@ a1.sources.r1.type = seq a1.sources.r1.totalEvents = 10 # first sink - dt - a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink + a1.sinks.dt.type = org.apache.apex.malhar.flume.sink.FlumeSink a1.sinks.dt.id = sink1 a1.sinks.dt.hostname = 127.0.0.1 a1.sinks.dt.port = 9098 @@ -48,7 +48,7 @@ a1.sources.r1.totalEvents = 10 a1.sinks.dt.channel = c1 # second sink - dt2 - a1.sinks.dt2.type = org.apache.apex.malhar.flume.sink.DTFlumeSink + a1.sinks.dt2.type = org.apache.apex.malhar.flume.sink.FlumeSink a1.sinks.dt2.id = sink2 a1.sinks.dt2.hostname = 127.0.0.1 a1.sinks.dt2.port = 9099
