Updated Branches:
  refs/heads/trunk c35b7c947 -> 5294ee61e

FLUME-1897: Implement Thrift Sink

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5294ee61
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5294ee61
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5294ee61

Branch: refs/heads/trunk
Commit: 5294ee61e71e24b33ab3e4b94bd3b1c03a35475d
Parents: c35b7c9
Author: Brock Noland <[email protected]>
Authored: Mon Feb 11 15:56:28 2013 -0600
Committer: Brock Noland <[email protected]>
Committed: Mon Feb 11 15:56:28 2013 -0600

----------------------------------------------------------------------
 .../org/apache/flume/sink/AbstractRpcSink.java     |  343 +++++++++++++++
 .../main/java/org/apache/flume/sink/AvroSink.java  |  213 +---------
 .../java/org/apache/flume/sink/ThriftSink.java     |  113 +++++
 .../java/org/apache/flume/sink/TestThriftSink.java |  195 ++++++++
 4 files changed, 655 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5294ee61/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java 
b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
new file mode 100644
index 0000000..52bd49b
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * This sink provides the basic RPC functionality for Flume. This sink takes
+ * several arguments which are used in RPC.
+ * This sink forms one half of Flume's tiered collection support. Events sent 
to
+ * this sink are transported over the network to the hostname / port pair using
+ * the RPC implementation encapsulated in {@link RpcClient}.
+ * The destination is an instance of Flume's {@link org.apache.flume.source
+ * .AvroSource} or {@link org.apache.flume.source.ThriftSource} (based on
+ * which implementation of this class is used), which
+ * allows Flume agents to forward to other Flume agents, forming a tiered
+ * collection infrastructure. Of course, nothing prevents one from using this
+ * sink to speak to other custom built infrastructure that implements the same
+ * RPC protocol.
+ * </p>
+ * <p>
+ * Events are taken from the configured {@link Channel} in batches of the
+ * configured <tt>batch-size</tt>. The batch size has no theoretical limits
+ * although all events in the batch <b>must</b> fit in memory. Generally, 
larger
+ * batches are far more efficient, but introduce a slight delay (measured in
+ * millis) in delivery. The batch behavior is such that underruns (i.e. batches
+ * smaller than the configured batch size) are possible. This is a compromise
+ * made to maintain low latency of event delivery. If the channel returns a 
null
+ * event, meaning it is empty, the batch is immediately sent, regardless of
+ * size. Batch underruns are tracked in the metrics. Empty batches do not incur
+ * an RPC roundtrip.
+ * </p>
+ * <p>
+ * <b>Configuration options</b>
+ * </p>
+ * <table>
+ * <tr>
+ * <th>Parameter</th>
+ * <th>Description</th>
+ * <th>Unit (data type)</th>
+ * <th>Default</th>
+ * </tr>
+ * <tr>
+ * <td><tt>hostname</tt></td>
+ * <td>The hostname to which events should be sent.</td>
+ * <td>Hostname or IP (String)</td>
+ * <td>none (required)</td>
+ * </tr>
+ * <tr>
+ * <td><tt>port</tt></td>
+ * <td>The port to which events should be sent on <tt>hostname</tt>.</td>
+ * <td>TCP port (int)</td>
+ * <td>none (required)</td>
+ * </tr>
+ * <tr>
+ * <td><tt>batch-size</tt></td>
+ * <td>The maximum number of events to send per RPC.</td>
+ * <td>events (int)</td>
+ * <td>100</td>
+ * </tr>
+ * <tr>
+ * <td><tt>connect-timeout</tt></td>
+ * <td>Maximum time to wait for the first Avro handshake and RPC request</td>
+ * <td>milliseconds (long)</td>
+ * <td>20000</td>
+ * </tr>
+ * <tr>
+ * <td><tt>request-timeout</tt></td>
+ * <td>Maximum time to wait RPC requests after the first</td>
+ * <td>milliseconds (long)</td>
+ * <td>20000</td>
+ * </tr>
+ * </table>
+ * <p>
+ * <b>Metrics</b>
+ * </p>
+ * <p>
+ * TODO
+ * </p>
+ *
+ * <strong>Implementation Notes:</strong> Any implementation of this class
+ * must override the {@linkplain #initializeRpcClient(Properties)} method.
+ * This method will be called whenever this sink needs to create a new
+ * connection to the source.
+ */
+public abstract class AbstractRpcSink extends AbstractSink
+  implements Configurable {
+
+  private static final Logger logger = LoggerFactory.getLogger
+    (AbstractRpcSink.class);
+  private String hostname;
+  private Integer port;
+  private RpcClient client;
+  private Properties clientProps;
+  private SinkCounter sinkCounter;
+
+  @Override
+  public void configure(Context context) {
+    clientProps = new Properties();
+
+    hostname = context.getString("hostname");
+    port = context.getInteger("port");
+
+    Preconditions.checkState(hostname != null, "No hostname specified");
+    Preconditions.checkState(port != null, "No port specified");
+
+    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, 
"h1");
+    
clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +
+        "h1", hostname + ":" + port);
+
+    Integer batchSize = context.getInteger("batch-size");
+    if (batchSize != null) {
+      
clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
+          String.valueOf(batchSize));
+    }
+
+    Long connectTimeout = context.getLong("connect-timeout");
+    if (connectTimeout != null) {
+      clientProps.setProperty(
+          RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
+          String.valueOf(connectTimeout));
+    }
+
+    Long requestTimeout = context.getLong("request-timeout");
+    if (requestTimeout != null) {
+      clientProps.setProperty(
+          RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
+          String.valueOf(requestTimeout));
+    }
+
+    if (sinkCounter == null) {
+      sinkCounter = new SinkCounter(getName());
+    }
+  }
+
+  /**
+   * Returns a new {@linkplain RpcClient} instance configured using the given
+   * {@linkplain Properties} object. This method is called whenever a new
+   * connection needs to be created to the next hop.
+   * @param props
+   * @return
+   */
+  protected abstract RpcClient initializeRpcClient(Properties props);
+
+  /**
+   * If this function is called successively without calling
+   * {@see #destroyConnection()}, only the first call has any effect.
+   * @throws org.apache.flume.FlumeException if an RPC client connection could 
not be opened
+   */
+  private void createConnection() throws FlumeException {
+
+    if (client == null) {
+      logger.info("Rpc sink {}: Building RpcClient with hostname: {}, " +
+          "port: {}",
+          new Object[] { getName(), hostname, port });
+      try {
+        client = initializeRpcClient(clientProps);
+        Preconditions.checkNotNull(client, "Rpc Client could not be " +
+          "initialized. " + getName() + " could not be started");
+        sinkCounter.incrementConnectionCreatedCount();
+      } catch (Exception ex) {
+        sinkCounter.incrementConnectionFailedCount();
+        if (ex instanceof FlumeException) {
+          throw (FlumeException) ex;
+        } else {
+          throw new FlumeException(ex);
+        }
+      }
+       logger.debug("Rpc sink {}: Created RpcClient: {}", getName(), client);
+    }
+
+  }
+
+  private void destroyConnection() {
+    if (client != null) {
+      logger.debug("Rpc sink {} closing Rpc client: {}", getName(), client);
+      try {
+        client.close();
+        sinkCounter.incrementConnectionClosedCount();
+      } catch (FlumeException e) {
+        sinkCounter.incrementConnectionFailedCount();
+        logger.error("Rpc sink " + getName() + ": Attempt to close Rpc " +
+            "client failed. Exception follows.", e);
+      }
+    }
+
+    client = null;
+  }
+
+  /**
+   * Ensure the connection exists and is active.
+   * If the connection is not active, destroy it and recreate it.
+   *
+   * @throws org.apache.flume.FlumeException If there are errors closing or 
opening the RPC
+   * connection.
+   */
+  private void verifyConnection() throws FlumeException {
+    if (client == null) {
+      createConnection();
+    } else if (!client.isActive()) {
+      destroyConnection();
+      createConnection();
+    }
+  }
+
+  /**
+   * The start() of RpcSink is more of an optimization that allows connection
+   * to be created before the process() loop is started. In case it so happens
+   * that the start failed, the process() loop will itself attempt to reconnect
+   * as necessary. This is the expected behavior since it is possible that the
+   * downstream source becomes unavailable in the middle of the process loop
+   * and the sink will have to retry the connection again.
+   */
+  @Override
+  public void start() {
+    logger.info("Starting {}...", this);
+    sinkCounter.start();
+    try {
+      createConnection();
+    } catch (FlumeException e) {
+      logger.warn("Unable to create Rpc client using hostname: " + hostname
+          + ", port: " + port, e);
+
+      /* Try to prevent leaking resources. */
+      destroyConnection();
+    }
+
+    super.start();
+
+    logger.info("Rpc sink {} started.", getName());
+  }
+
+  @Override
+  public void stop() {
+    logger.info("Rpc sink {} stopping...", getName());
+
+    destroyConnection();
+    sinkCounter.stop();
+    super.stop();
+
+    logger.info("Rpc sink {} stopped. Metrics: {}", getName(), sinkCounter);
+  }
+
+  @Override
+  public String toString() {
+    return "RpcSink " + getName() + " { host: " + hostname + ", port: " +
+        port + " }";
+  }
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    Status status = Status.READY;
+    Channel channel = getChannel();
+    Transaction transaction = channel.getTransaction();
+
+    try {
+      transaction.begin();
+
+      verifyConnection();
+
+      List<Event> batch = Lists.newLinkedList();
+
+      for (int i = 0; i < client.getBatchSize(); i++) {
+        Event event = channel.take();
+
+        if (event == null) {
+          break;
+        }
+
+        batch.add(event);
+      }
+
+      int size = batch.size();
+      int batchSize = client.getBatchSize();
+
+      if (size == 0) {
+        sinkCounter.incrementBatchEmptyCount();
+        status = Status.BACKOFF;
+      } else {
+        if (size < batchSize) {
+          sinkCounter.incrementBatchUnderflowCount();
+        } else {
+          sinkCounter.incrementBatchCompleteCount();
+        }
+        sinkCounter.addToEventDrainAttemptCount(size);
+        client.appendBatch(batch);
+      }
+
+      transaction.commit();
+      sinkCounter.addToEventDrainSuccessCount(size);
+
+    } catch (Throwable t) {
+      transaction.rollback();
+      if (t instanceof Error) {
+        throw (Error) t;
+      } else if (t instanceof ChannelException) {
+        logger.error("Rpc Sink " + getName() + ": Unable to get event from" +
+            " channel " + channel.getName() + ". Exception follows.", t);
+        status = Status.BACKOFF;
+      } else {
+        destroyConnection();
+        throw new EventDeliveryException("Failed to send events", t);
+      }
+    } finally {
+      transaction.close();
+    }
+
+    return status;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5294ee61/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java 
b/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
index d4ddcbe..34f9fa6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
@@ -117,218 +117,13 @@ import com.google.common.collect.Lists;
  * TODO
  * </p>
  */
-public class AvroSink extends AbstractSink implements Configurable {
+public class AvroSink extends AbstractRpcSink {
 
   private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
 
-  private String hostname;
-  private Integer port;
-
-  private RpcClient client;
-  private Properties clientProps;
-  private SinkCounter sinkCounter;
-
-  @Override
-  public void configure(Context context) {
-    clientProps = new Properties();
-
-    hostname = context.getString("hostname");
-    port = context.getInteger("port");
-
-    Preconditions.checkState(hostname != null, "No hostname specified");
-    Preconditions.checkState(port != null, "No port specified");
-
-    clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, 
"h1");
-    
clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +
-        "h1", hostname + ":" + port);
-
-    Integer batchSize = context.getInteger("batch-size");
-    if (batchSize != null) {
-      
clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
-          String.valueOf(batchSize));
-    }
-
-    Long connectTimeout = context.getLong("connect-timeout");
-    if (connectTimeout != null) {
-      clientProps.setProperty(
-          RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
-          String.valueOf(connectTimeout));
-    }
-
-    Long requestTimeout = context.getLong("request-timeout");
-    if (requestTimeout != null) {
-      clientProps.setProperty(
-          RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
-          String.valueOf(requestTimeout));
-    }
-
-    if (sinkCounter == null) {
-      sinkCounter = new SinkCounter(getName());
-    }
-  }
-
-  /**
-   * If this function is called successively without calling
-   * {@see #destroyConnection()}, only the first call has any effect.
-   * @throws FlumeException if an RPC client connection could not be opened
-   */
-  private void createConnection() throws FlumeException {
-
-    if (client == null) {
-      logger.info("Avro sink {}: Building RpcClient with hostname: {}, " +
-          "port: {}",
-          new Object[] { getName(), hostname, port });
-      try {
-        client = RpcClientFactory.getInstance(clientProps);
-        sinkCounter.incrementConnectionCreatedCount();
-      } catch (Exception ex) {
-        sinkCounter.incrementConnectionFailedCount();
-        if (ex instanceof FlumeException) {
-          throw (FlumeException) ex;
-        } else {
-          throw new FlumeException(ex);
-        }
-      }
-       logger.debug("Avro sink {}: Created RpcClient: {}", getName(), client);
-    }
-
-  }
-
-  private void destroyConnection() {
-    if (client != null) {
-      logger.debug("Avro sink {} closing avro client: {}", getName(), client);
-      try {
-        client.close();
-        sinkCounter.incrementConnectionClosedCount();
-      } catch (FlumeException e) {
-        sinkCounter.incrementConnectionFailedCount();
-        logger.error("Avro sink " + getName() + ": Attempt to close avro " +
-            "client failed. Exception follows.", e);
-      }
-    }
-
-    client = null;
-  }
-
-  /**
-   * Ensure the connection exists and is active.
-   * If the connection is not active, destroy it and recreate it.
-   *
-   * @throws FlumeException If there are errors closing or opening the RPC
-   * connection.
-   */
-  private void verifyConnection() throws FlumeException {
-    if (client == null) {
-      createConnection();
-    } else if (!client.isActive()) {
-      destroyConnection();
-      createConnection();
-    }
-  }
-
-  /**
-   * The start() of AvroSink is more of an optimization that allows connection
-   * to be created before the process() loop is started. In case it so happens
-   * that the start failed, the process() loop will itself attempt to reconnect
-   * as necessary. This is the expected behavior since it is possible that the
-   * downstream source becomes unavailable in the middle of the process loop
-   * and the sink will have to retry the connection again.
-   */
   @Override
-  public void start() {
-    logger.info("Starting {}...", this);
-    sinkCounter.start();
-    try {
-      createConnection();
-    } catch (FlumeException e) {
-      logger.warn("Unable to create avro client using hostname: " + hostname
-          + ", port: " + port, e);
-
-      /* Try to prevent leaking resources. */
-      destroyConnection();
-    }
-
-    super.start();
-
-    logger.info("Avro sink {} started.", getName());
+  protected RpcClient initializeRpcClient(Properties props) {
+    logger.info("Attempting to create Avro Rpc client.");
+    return RpcClientFactory.getInstance(props);
   }
-
-  @Override
-  public void stop() {
-    logger.info("Avro sink {} stopping...", getName());
-
-    destroyConnection();
-    sinkCounter.stop();
-    super.stop();
-
-    logger.info("Avro sink {} stopped. Metrics: {}", getName(), sinkCounter);
-  }
-
-  @Override
-  public String toString() {
-    return "AvroSink " + getName() + " { host: " + hostname + ", port: " +
-        port + " }";
-  }
-
-  @Override
-  public Status process() throws EventDeliveryException {
-    Status status = Status.READY;
-    Channel channel = getChannel();
-    Transaction transaction = channel.getTransaction();
-
-    try {
-      transaction.begin();
-
-      verifyConnection();
-
-      List<Event> batch = Lists.newLinkedList();
-
-      for (int i = 0; i < client.getBatchSize(); i++) {
-        Event event = channel.take();
-
-        if (event == null) {
-          break;
-        }
-
-        batch.add(event);
-      }
-
-      int size = batch.size();
-      int batchSize = client.getBatchSize();
-
-      if (size == 0) {
-        sinkCounter.incrementBatchEmptyCount();
-        status = Status.BACKOFF;
-      } else {
-        if (size < batchSize) {
-          sinkCounter.incrementBatchUnderflowCount();
-        } else {
-          sinkCounter.incrementBatchCompleteCount();
-        }
-        sinkCounter.addToEventDrainAttemptCount(size);
-        client.appendBatch(batch);
-      }
-
-      transaction.commit();
-      sinkCounter.addToEventDrainSuccessCount(size);
-
-    } catch (Throwable t) {
-      transaction.rollback();
-      if (t instanceof Error) {
-        throw (Error) t;
-      } else if (t instanceof ChannelException) {
-        logger.error("Avro Sink " + getName() + ": Unable to get event from" +
-            " channel " + channel.getName() + ". Exception follows.", t);
-        status = Status.BACKOFF;
-      } else {
-        destroyConnection();
-        throw new EventDeliveryException("Failed to send events", t);
-      }
-    } finally {
-      transaction.close();
-    }
-
-    return status;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/5294ee61/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java 
b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
new file mode 100644
index 0000000..48a9775
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink;
+
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
+import org.apache.flume.api.RpcClientFactory;
+
+import java.util.Properties;
+/**
+ * <p>
+ * A {@link org.apache.flume.Sink} implementation that can send events to an 
RPC server (such as
+ * Flume's {@link org.apache.flume.source.ThriftSource}).
+ * </p>
+ * <p>
+ * This sink forms one half of Flume's tiered collection support. Events sent 
to
+ * this sink are transported over the network to the hostname / port pair using
+ * the RPC implementation encapsulated in {@link RpcClient}.
+ * The destination is an instance of Flume's
+ * {@link org.apache.flume.source.ThriftSource}, which
+ * allows Flume agents to forward to other Flume agents, forming a tiered
+ * collection infrastructure. Of course, nothing prevents one from using this
+ * sink to speak to other custom built infrastructure that implements the same
+ * RPC protocol.
+ * </p>
+ * <p>
+ * Events are taken from the configured {@link org.apache.flume.Channel} in 
batches of the
+ * configured <tt>batch-size</tt>. The batch size has no theoretical limits
+ * although all events in the batch <b>must</b> fit in memory. Generally, 
larger
+ * batches are far more efficient, but introduce a slight delay (measured in
+ * millis) in delivery. The batch behavior is such that underruns (i.e. batches
+ * smaller than the configured batch size) are possible. This is a compromise
+ * made to maintain low latency of event delivery. If the channel returns a 
null
+ * event, meaning it is empty, the batch is immediately sent, regardless of
+ * size. Batch underruns are tracked in the metrics. Empty batches do not incur
+ * an RPC roundtrip.
+ * </p>
+ * <p>
+ * <b>Configuration options</b>
+ * </p>
+ * <table>
+ * <tr>
+ * <th>Parameter</th>
+ * <th>Description</th>
+ * <th>Unit (data type)</th>
+ * <th>Default</th>
+ * </tr>
+ * <tr>
+ * <td><tt>hostname</tt></td>
+ * <td>The hostname to which events should be sent.</td>
+ * <td>Hostname or IP (String)</td>
+ * <td>none (required)</td>
+ * </tr>
+ * <tr>
+ * <td><tt>port</tt></td>
+ * <td>The port to which events should be sent on <tt>hostname</tt>.</td>
+ * <td>TCP port (int)</td>
+ * <td>none (required)</td>
+ * </tr>
+ * <tr>
+ * <td><tt>batch-size</tt></td>
+ * <td>The maximum number of events to send per RPC.</td>
+ * <td>events (int)</td>
+ * <td>100</td>
+ * </tr>
+ * <tr>
+ * <td><tt>connect-timeout</tt></td>
+ * <td>Maximum time to wait for the first Avro handshake and RPC request</td>
+ * <td>milliseconds (long)</td>
+ * <td>20000</td>
+ * </tr>
+ * <tr>
+ * <td><tt>request-timeout</tt></td>
+ * <td>Maximum time to wait RPC requests after the first</td>
+ * <td>milliseconds (long)</td>
+ * <td>20000</td>
+ * </tr>
+ * </table>
+ * <p>
+ * <b>Metrics</b>
+ * </p>
+ * <p>
+ * TODO
+ * </p>
+ */
+public class ThriftSink extends AbstractRpcSink {
+  @Override
+  protected RpcClient initializeRpcClient(Properties props) {
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
+      RpcClientFactory.ClientType.THRIFT.name());
+    // Only one thread is enough, since only one sink thread processes
+    // transactions at any given time. Each sink owns its own Rpc client.
+    props.setProperty(RpcClientConfigurationConstants
+      .CONFIG_CONNECTION_POOL_SIZE, String.valueOf(1));
+    return RpcClientFactory.getInstance(props);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5294ee61/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
new file mode 100644
index 0000000..5f70d1b
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink;
+
+import com.google.common.base.Charsets;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.api.ThriftTestingSource;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestThriftSink {
+  private ThriftTestingSource src;
+  private ThriftSink sink;
+  private MemoryChannel channel;
+  private String hostname;
+  private int port;
+
+  private final Random random = new Random();
+
+  @Before
+  public void setUp() throws Exception {
+    sink = new ThriftSink();
+    channel = new MemoryChannel();
+    hostname = "0.0.0.0";
+    port = random.nextInt(50000) + 1024;
+    Context context = new Context();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("batch-size", String.valueOf(2));
+    context.put("request-timeout", String.valueOf(2000L));
+
+    sink.setChannel(channel);
+
+    Configurables.configure(sink, context);
+    Configurables.configure(channel, context);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    channel.stop();
+    sink.stop();
+    src.stop();
+  }
+
+  @Test
+  public void testProcess() throws Exception {
+
+    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+    src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
+      port);
+
+    channel.start();
+    sink.start();
+
+    Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
+    for (int i = 0; i < 11; i++) {
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+    for (int i = 0; i < 6; i++) {
+      Sink.Status status = sink.process();
+      Assert.assertEquals(Sink.Status.READY, status);
+    }
+
+    Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
+
+    sink.stop();
+    Assert.assertEquals(11, src.flumeEvents.size());
+    Assert.assertEquals(6, src.batchCount);
+    Assert.assertEquals(0, src.individualCount);
+
+  }
+
+  @Test
+  public void testTimeout() throws Exception {
+    AtomicLong delay = new AtomicLong();
+    src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ALTERNATE
+      .name(), port);
+    src.setDelay(delay);
+    delay.set(2500);
+
+    Event event = EventBuilder.withBody("foo", Charsets.UTF_8);
+    sink.start();
+
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int i = 0; i < 4; i++) {
+      channel.put(event);
+    }
+    txn.commit();
+    txn.close();
+
+    // should throw EventDeliveryException due to connect timeout
+    boolean threw = false;
+    try {
+      sink.process();
+    } catch (EventDeliveryException ex) {
+      threw = true;
+    }
+
+    Assert.assertTrue("Must throw due to connect timeout", threw);
+
+    // now, allow the connect handshake to occur
+    delay.set(0);
+    sink.process();
+
+    // should throw another EventDeliveryException due to request timeout
+    delay.set(2500L); // because request-timeout = 3000
+    threw = false;
+    try {
+      sink.process();
+    } catch (EventDeliveryException ex) {
+      threw = true;
+    }
+
+    Assert.assertTrue("Must throw due to request timeout", threw);
+
+    sink.stop();
+  }
+
+  @Test
+  public void testFailedConnect() throws Exception {
+
+    Event event = EventBuilder.withBody("test event 1",
+      Charset.forName("UTF8"));
+
+    sink.start();
+
+    Thread.sleep(500L); // let socket startup
+    Thread.sleep(500L); // sleep a little to allow close occur
+
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    for (int i = 0; i < 10; i++) {
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+
+    for (int i = 0; i < 5; i++) {
+      boolean threwException = false;
+      try {
+        sink.process();
+      } catch (EventDeliveryException e) {
+        threwException = true;
+      }
+      Assert.assertTrue("Must throw EventDeliveryException if disconnected",
+        threwException);
+    }
+
+    src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
+      port);
+
+    for (int i = 0; i < 5; i++) {
+      Sink.Status status = sink.process();
+      Assert.assertEquals(Sink.Status.READY, status);
+    }
+
+    Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
+    sink.stop();
+  }
+}

Reply via email to