Updated Branches: refs/heads/trunk c35b7c947 -> 5294ee61e
FLUME-1897: Implement Thrift Sink (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5294ee61 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5294ee61 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5294ee61 Branch: refs/heads/trunk Commit: 5294ee61e71e24b33ab3e4b94bd3b1c03a35475d Parents: c35b7c9 Author: Brock Noland <[email protected]> Authored: Mon Feb 11 15:56:28 2013 -0600 Committer: Brock Noland <[email protected]> Committed: Mon Feb 11 15:56:28 2013 -0600 ---------------------------------------------------------------------- .../org/apache/flume/sink/AbstractRpcSink.java | 343 +++++++++++++++ .../main/java/org/apache/flume/sink/AvroSink.java | 213 +--------- .../java/org/apache/flume/sink/ThriftSink.java | 113 +++++ .../java/org/apache/flume/sink/TestThriftSink.java | 195 ++++++++ 4 files changed, 655 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5294ee61/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java new file mode 100644 index 0000000..52bd49b --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.flume.Channel; +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.Transaction; +import org.apache.flume.api.RpcClient; +import org.apache.flume.api.RpcClientConfigurationConstants; +import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.SinkCounter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Properties; + +/** + * This sink provides the basic RPC functionality for Flume. This sink takes + * several arguments which are used in RPC. + * This sink forms one half of Flume's tiered collection support. Events sent to + * this sink are transported over the network to the hostname / port pair using + * the RPC implementation encapsulated in {@link RpcClient}. + * The destination is an instance of Flume's {@link org.apache.flume.source + * .AvroSource} or {@link org.apache.flume.source.ThriftSource} (based on + * which implementation of this class is used), which + * allows Flume agents to forward to other Flume agents, forming a tiered + * collection infrastructure. Of course, nothing prevents one from using this + * sink to speak to other custom built infrastructure that implements the same + * RPC protocol. + * </p> + * <p> + * Events are taken from the configured {@link Channel} in batches of the + * configured <tt>batch-size</tt>. The batch size has no theoretical limits + * although all events in the batch <b>must</b> fit in memory. Generally, larger + * batches are far more efficient, but introduce a slight delay (measured in + * millis) in delivery. The batch behavior is such that underruns (i.e. batches + * smaller than the configured batch size) are possible. This is a compromise + * made to maintain low latency of event delivery. If the channel returns a null + * event, meaning it is empty, the batch is immediately sent, regardless of + * size. Batch underruns are tracked in the metrics. Empty batches do not incur + * an RPC roundtrip. + * </p> + * <p> + * <b>Configuration options</b> + * </p> + * <table> + * <tr> + * <th>Parameter</th> + * <th>Description</th> + * <th>Unit (data type)</th> + * <th>Default</th> + * </tr> + * <tr> + * <td><tt>hostname</tt></td> + * <td>The hostname to which events should be sent.</td> + * <td>Hostname or IP (String)</td> + * <td>none (required)</td> + * </tr> + * <tr> + * <td><tt>port</tt></td> + * <td>The port to which events should be sent on <tt>hostname</tt>.</td> + * <td>TCP port (int)</td> + * <td>none (required)</td> + * </tr> + * <tr> + * <td><tt>batch-size</tt></td> + * <td>The maximum number of events to send per RPC.</td> + * <td>events (int)</td> + * <td>100</td> + * </tr> + * <tr> + * <td><tt>connect-timeout</tt></td> + * <td>Maximum time to wait for the first Avro handshake and RPC request</td> + * <td>milliseconds (long)</td> + * <td>20000</td> + * </tr> + * <tr> + * <td><tt>request-timeout</tt></td> + * <td>Maximum time to wait RPC requests after the first</td> + * <td>milliseconds (long)</td> + * <td>20000</td> + * </tr> + * </table> + * <p> + * <b>Metrics</b> + * </p> + * <p> + * TODO + * </p> + * + * <strong>Implementation Notes:</strong> Any implementation of this class + * must override the {@linkplain #initializeRpcClient(Properties)} method. + * This method will be called whenever this sink needs to create a new + * connection to the source. + */ +public abstract class AbstractRpcSink extends AbstractSink + implements Configurable { + + private static final Logger logger = LoggerFactory.getLogger + (AbstractRpcSink.class); + private String hostname; + private Integer port; + private RpcClient client; + private Properties clientProps; + private SinkCounter sinkCounter; + + @Override + public void configure(Context context) { + clientProps = new Properties(); + + hostname = context.getString("hostname"); + port = context.getInteger("port"); + + Preconditions.checkState(hostname != null, "No hostname specified"); + Preconditions.checkState(port != null, "No port specified"); + + clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1"); + clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + + "h1", hostname + ":" + port); + + Integer batchSize = context.getInteger("batch-size"); + if (batchSize != null) { + clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, + String.valueOf(batchSize)); + } + + Long connectTimeout = context.getLong("connect-timeout"); + if (connectTimeout != null) { + clientProps.setProperty( + RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT, + String.valueOf(connectTimeout)); + } + + Long requestTimeout = context.getLong("request-timeout"); + if (requestTimeout != null) { + clientProps.setProperty( + RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, + String.valueOf(requestTimeout)); + } + + if (sinkCounter == null) { + sinkCounter = new SinkCounter(getName()); + } + } + + /** + * Returns a new {@linkplain RpcClient} instance configured using the given + * {@linkplain Properties} object. This method is called whenever a new + * connection needs to be created to the next hop. + * @param props + * @return + */ + protected abstract RpcClient initializeRpcClient(Properties props); + + /** + * If this function is called successively without calling + * {@see #destroyConnection()}, only the first call has any effect. + * @throws org.apache.flume.FlumeException if an RPC client connection could not be opened + */ + private void createConnection() throws FlumeException { + + if (client == null) { + logger.info("Rpc sink {}: Building RpcClient with hostname: {}, " + + "port: {}", + new Object[] { getName(), hostname, port }); + try { + client = initializeRpcClient(clientProps); + Preconditions.checkNotNull(client, "Rpc Client could not be " + + "initialized. " + getName() + " could not be started"); + sinkCounter.incrementConnectionCreatedCount(); + } catch (Exception ex) { + sinkCounter.incrementConnectionFailedCount(); + if (ex instanceof FlumeException) { + throw (FlumeException) ex; + } else { + throw new FlumeException(ex); + } + } + logger.debug("Rpc sink {}: Created RpcClient: {}", getName(), client); + } + + } + + private void destroyConnection() { + if (client != null) { + logger.debug("Rpc sink {} closing Rpc client: {}", getName(), client); + try { + client.close(); + sinkCounter.incrementConnectionClosedCount(); + } catch (FlumeException e) { + sinkCounter.incrementConnectionFailedCount(); + logger.error("Rpc sink " + getName() + ": Attempt to close Rpc " + + "client failed. Exception follows.", e); + } + } + + client = null; + } + + /** + * Ensure the connection exists and is active. + * If the connection is not active, destroy it and recreate it. + * + * @throws org.apache.flume.FlumeException If there are errors closing or opening the RPC + * connection. + */ + private void verifyConnection() throws FlumeException { + if (client == null) { + createConnection(); + } else if (!client.isActive()) { + destroyConnection(); + createConnection(); + } + } + + /** + * The start() of RpcSink is more of an optimization that allows connection + * to be created before the process() loop is started. In case it so happens + * that the start failed, the process() loop will itself attempt to reconnect + * as necessary. This is the expected behavior since it is possible that the + * downstream source becomes unavailable in the middle of the process loop + * and the sink will have to retry the connection again. + */ + @Override + public void start() { + logger.info("Starting {}...", this); + sinkCounter.start(); + try { + createConnection(); + } catch (FlumeException e) { + logger.warn("Unable to create Rpc client using hostname: " + hostname + + ", port: " + port, e); + + /* Try to prevent leaking resources. */ + destroyConnection(); + } + + super.start(); + + logger.info("Rpc sink {} started.", getName()); + } + + @Override + public void stop() { + logger.info("Rpc sink {} stopping...", getName()); + + destroyConnection(); + sinkCounter.stop(); + super.stop(); + + logger.info("Rpc sink {} stopped. Metrics: {}", getName(), sinkCounter); + } + + @Override + public String toString() { + return "RpcSink " + getName() + " { host: " + hostname + ", port: " + + port + " }"; + } + + @Override + public Status process() throws EventDeliveryException { + Status status = Status.READY; + Channel channel = getChannel(); + Transaction transaction = channel.getTransaction(); + + try { + transaction.begin(); + + verifyConnection(); + + List<Event> batch = Lists.newLinkedList(); + + for (int i = 0; i < client.getBatchSize(); i++) { + Event event = channel.take(); + + if (event == null) { + break; + } + + batch.add(event); + } + + int size = batch.size(); + int batchSize = client.getBatchSize(); + + if (size == 0) { + sinkCounter.incrementBatchEmptyCount(); + status = Status.BACKOFF; + } else { + if (size < batchSize) { + sinkCounter.incrementBatchUnderflowCount(); + } else { + sinkCounter.incrementBatchCompleteCount(); + } + sinkCounter.addToEventDrainAttemptCount(size); + client.appendBatch(batch); + } + + transaction.commit(); + sinkCounter.addToEventDrainSuccessCount(size); + + } catch (Throwable t) { + transaction.rollback(); + if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof ChannelException) { + logger.error("Rpc Sink " + getName() + ": Unable to get event from" + + " channel " + channel.getName() + ". Exception follows.", t); + status = Status.BACKOFF; + } else { + destroyConnection(); + throw new EventDeliveryException("Failed to send events", t); + } + } finally { + transaction.close(); + } + + return status; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5294ee61/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java index d4ddcbe..34f9fa6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java @@ -117,218 +117,13 @@ import com.google.common.collect.Lists; * TODO * </p> */ -public class AvroSink extends AbstractSink implements Configurable { +public class AvroSink extends AbstractRpcSink { private static final Logger logger = LoggerFactory.getLogger(AvroSink.class); - private String hostname; - private Integer port; - - private RpcClient client; - private Properties clientProps; - private SinkCounter sinkCounter; - - @Override - public void configure(Context context) { - clientProps = new Properties(); - - hostname = context.getString("hostname"); - port = context.getInteger("port"); - - Preconditions.checkState(hostname != null, "No hostname specified"); - Preconditions.checkState(port != null, "No port specified"); - - clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1"); - clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + - "h1", hostname + ":" + port); - - Integer batchSize = context.getInteger("batch-size"); - if (batchSize != null) { - clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, - String.valueOf(batchSize)); - } - - Long connectTimeout = context.getLong("connect-timeout"); - if (connectTimeout != null) { - clientProps.setProperty( - RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT, - String.valueOf(connectTimeout)); - } - - Long requestTimeout = context.getLong("request-timeout"); - if (requestTimeout != null) { - clientProps.setProperty( - RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, - String.valueOf(requestTimeout)); - } - - if (sinkCounter == null) { - sinkCounter = new SinkCounter(getName()); - } - } - - /** - * If this function is called successively without calling - * {@see #destroyConnection()}, only the first call has any effect. - * @throws FlumeException if an RPC client connection could not be opened - */ - private void createConnection() throws FlumeException { - - if (client == null) { - logger.info("Avro sink {}: Building RpcClient with hostname: {}, " + - "port: {}", - new Object[] { getName(), hostname, port }); - try { - client = RpcClientFactory.getInstance(clientProps); - sinkCounter.incrementConnectionCreatedCount(); - } catch (Exception ex) { - sinkCounter.incrementConnectionFailedCount(); - if (ex instanceof FlumeException) { - throw (FlumeException) ex; - } else { - throw new FlumeException(ex); - } - } - logger.debug("Avro sink {}: Created RpcClient: {}", getName(), client); - } - - } - - private void destroyConnection() { - if (client != null) { - logger.debug("Avro sink {} closing avro client: {}", getName(), client); - try { - client.close(); - sinkCounter.incrementConnectionClosedCount(); - } catch (FlumeException e) { - sinkCounter.incrementConnectionFailedCount(); - logger.error("Avro sink " + getName() + ": Attempt to close avro " + - "client failed. Exception follows.", e); - } - } - - client = null; - } - - /** - * Ensure the connection exists and is active. - * If the connection is not active, destroy it and recreate it. - * - * @throws FlumeException If there are errors closing or opening the RPC - * connection. - */ - private void verifyConnection() throws FlumeException { - if (client == null) { - createConnection(); - } else if (!client.isActive()) { - destroyConnection(); - createConnection(); - } - } - - /** - * The start() of AvroSink is more of an optimization that allows connection - * to be created before the process() loop is started. In case it so happens - * that the start failed, the process() loop will itself attempt to reconnect - * as necessary. This is the expected behavior since it is possible that the - * downstream source becomes unavailable in the middle of the process loop - * and the sink will have to retry the connection again. - */ @Override - public void start() { - logger.info("Starting {}...", this); - sinkCounter.start(); - try { - createConnection(); - } catch (FlumeException e) { - logger.warn("Unable to create avro client using hostname: " + hostname - + ", port: " + port, e); - - /* Try to prevent leaking resources. */ - destroyConnection(); - } - - super.start(); - - logger.info("Avro sink {} started.", getName()); + protected RpcClient initializeRpcClient(Properties props) { + logger.info("Attempting to create Avro Rpc client."); + return RpcClientFactory.getInstance(props); } - - @Override - public void stop() { - logger.info("Avro sink {} stopping...", getName()); - - destroyConnection(); - sinkCounter.stop(); - super.stop(); - - logger.info("Avro sink {} stopped. Metrics: {}", getName(), sinkCounter); - } - - @Override - public String toString() { - return "AvroSink " + getName() + " { host: " + hostname + ", port: " + - port + " }"; - } - - @Override - public Status process() throws EventDeliveryException { - Status status = Status.READY; - Channel channel = getChannel(); - Transaction transaction = channel.getTransaction(); - - try { - transaction.begin(); - - verifyConnection(); - - List<Event> batch = Lists.newLinkedList(); - - for (int i = 0; i < client.getBatchSize(); i++) { - Event event = channel.take(); - - if (event == null) { - break; - } - - batch.add(event); - } - - int size = batch.size(); - int batchSize = client.getBatchSize(); - - if (size == 0) { - sinkCounter.incrementBatchEmptyCount(); - status = Status.BACKOFF; - } else { - if (size < batchSize) { - sinkCounter.incrementBatchUnderflowCount(); - } else { - sinkCounter.incrementBatchCompleteCount(); - } - sinkCounter.addToEventDrainAttemptCount(size); - client.appendBatch(batch); - } - - transaction.commit(); - sinkCounter.addToEventDrainSuccessCount(size); - - } catch (Throwable t) { - transaction.rollback(); - if (t instanceof Error) { - throw (Error) t; - } else if (t instanceof ChannelException) { - logger.error("Avro Sink " + getName() + ": Unable to get event from" + - " channel " + channel.getName() + ". Exception follows.", t); - status = Status.BACKOFF; - } else { - destroyConnection(); - throw new EventDeliveryException("Failed to send events", t); - } - } finally { - transaction.close(); - } - - return status; - } - } http://git-wip-us.apache.org/repos/asf/flume/blob/5294ee61/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java new file mode 100644 index 0000000..48a9775 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink; + +import org.apache.flume.api.RpcClient; +import org.apache.flume.api.RpcClientConfigurationConstants; +import org.apache.flume.api.RpcClientFactory; + +import java.util.Properties; +/** + * <p> + * A {@link org.apache.flume.Sink} implementation that can send events to an RPC server (such as + * Flume's {@link org.apache.flume.source.ThriftSource}). + * </p> + * <p> + * This sink forms one half of Flume's tiered collection support. Events sent to + * this sink are transported over the network to the hostname / port pair using + * the RPC implementation encapsulated in {@link RpcClient}. + * The destination is an instance of Flume's + * {@link org.apache.flume.source.ThriftSource}, which + * allows Flume agents to forward to other Flume agents, forming a tiered + * collection infrastructure. Of course, nothing prevents one from using this + * sink to speak to other custom built infrastructure that implements the same + * RPC protocol. + * </p> + * <p> + * Events are taken from the configured {@link org.apache.flume.Channel} in batches of the + * configured <tt>batch-size</tt>. The batch size has no theoretical limits + * although all events in the batch <b>must</b> fit in memory. Generally, larger + * batches are far more efficient, but introduce a slight delay (measured in + * millis) in delivery. The batch behavior is such that underruns (i.e. batches + * smaller than the configured batch size) are possible. This is a compromise + * made to maintain low latency of event delivery. If the channel returns a null + * event, meaning it is empty, the batch is immediately sent, regardless of + * size. Batch underruns are tracked in the metrics. Empty batches do not incur + * an RPC roundtrip. + * </p> + * <p> + * <b>Configuration options</b> + * </p> + * <table> + * <tr> + * <th>Parameter</th> + * <th>Description</th> + * <th>Unit (data type)</th> + * <th>Default</th> + * </tr> + * <tr> + * <td><tt>hostname</tt></td> + * <td>The hostname to which events should be sent.</td> + * <td>Hostname or IP (String)</td> + * <td>none (required)</td> + * </tr> + * <tr> + * <td><tt>port</tt></td> + * <td>The port to which events should be sent on <tt>hostname</tt>.</td> + * <td>TCP port (int)</td> + * <td>none (required)</td> + * </tr> + * <tr> + * <td><tt>batch-size</tt></td> + * <td>The maximum number of events to send per RPC.</td> + * <td>events (int)</td> + * <td>100</td> + * </tr> + * <tr> + * <td><tt>connect-timeout</tt></td> + * <td>Maximum time to wait for the first Avro handshake and RPC request</td> + * <td>milliseconds (long)</td> + * <td>20000</td> + * </tr> + * <tr> + * <td><tt>request-timeout</tt></td> + * <td>Maximum time to wait RPC requests after the first</td> + * <td>milliseconds (long)</td> + * <td>20000</td> + * </tr> + * </table> + * <p> + * <b>Metrics</b> + * </p> + * <p> + * TODO + * </p> + */ +public class ThriftSink extends AbstractRpcSink { + @Override + protected RpcClient initializeRpcClient(Properties props) { + props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, + RpcClientFactory.ClientType.THRIFT.name()); + // Only one thread is enough, since only one sink thread processes + // transactions at any given time. Each sink owns its own Rpc client. + props.setProperty(RpcClientConfigurationConstants + .CONFIG_CONNECTION_POOL_SIZE, String.valueOf(1)); + return RpcClientFactory.getInstance(props); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5294ee61/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java new file mode 100644 index 0000000..5f70d1b --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink; + +import com.google.common.base.Charsets; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Sink; +import org.apache.flume.Transaction; +import org.apache.flume.api.ThriftTestingSource; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.Charset; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +public class TestThriftSink { + private ThriftTestingSource src; + private ThriftSink sink; + private MemoryChannel channel; + private String hostname; + private int port; + + private final Random random = new Random(); + + @Before + public void setUp() throws Exception { + sink = new ThriftSink(); + channel = new MemoryChannel(); + hostname = "0.0.0.0"; + port = random.nextInt(50000) + 1024; + Context context = new Context(); + + context.put("hostname", hostname); + context.put("port", String.valueOf(port)); + context.put("batch-size", String.valueOf(2)); + context.put("request-timeout", String.valueOf(2000L)); + + sink.setChannel(channel); + + Configurables.configure(sink, context); + Configurables.configure(channel, context); + } + + @After + public void tearDown() throws Exception { + channel.stop(); + sink.stop(); + src.stop(); + } + + @Test + public void testProcess() throws Exception { + + Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); + src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), + port); + + channel.start(); + sink.start(); + + Transaction transaction = channel.getTransaction(); + + transaction.begin(); + for (int i = 0; i < 11; i++) { + channel.put(event); + } + transaction.commit(); + transaction.close(); + for (int i = 0; i < 6; i++) { + Sink.Status status = sink.process(); + Assert.assertEquals(Sink.Status.READY, status); + } + + Assert.assertEquals(Sink.Status.BACKOFF, sink.process()); + + sink.stop(); + Assert.assertEquals(11, src.flumeEvents.size()); + Assert.assertEquals(6, src.batchCount); + Assert.assertEquals(0, src.individualCount); + + } + + @Test + public void testTimeout() throws Exception { + AtomicLong delay = new AtomicLong(); + src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ALTERNATE + .name(), port); + src.setDelay(delay); + delay.set(2500); + + Event event = EventBuilder.withBody("foo", Charsets.UTF_8); + sink.start(); + + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int i = 0; i < 4; i++) { + channel.put(event); + } + txn.commit(); + txn.close(); + + // should throw EventDeliveryException due to connect timeout + boolean threw = false; + try { + sink.process(); + } catch (EventDeliveryException ex) { + threw = true; + } + + Assert.assertTrue("Must throw due to connect timeout", threw); + + // now, allow the connect handshake to occur + delay.set(0); + sink.process(); + + // should throw another EventDeliveryException due to request timeout + delay.set(2500L); // because request-timeout = 3000 + threw = false; + try { + sink.process(); + } catch (EventDeliveryException ex) { + threw = true; + } + + Assert.assertTrue("Must throw due to request timeout", threw); + + sink.stop(); + } + + @Test + public void testFailedConnect() throws Exception { + + Event event = EventBuilder.withBody("test event 1", + Charset.forName("UTF8")); + + sink.start(); + + Thread.sleep(500L); // let socket startup + Thread.sleep(500L); // sleep a little to allow close occur + + Transaction transaction = channel.getTransaction(); + transaction.begin(); + for (int i = 0; i < 10; i++) { + channel.put(event); + } + transaction.commit(); + transaction.close(); + + for (int i = 0; i < 5; i++) { + boolean threwException = false; + try { + sink.process(); + } catch (EventDeliveryException e) { + threwException = true; + } + Assert.assertTrue("Must throw EventDeliveryException if disconnected", + threwException); + } + + src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), + port); + + for (int i = 0; i < 5; i++) { + Sink.Status status = sink.process(); + Assert.assertEquals(Sink.Status.READY, status); + } + + Assert.assertEquals(Sink.Status.BACKOFF, sink.process()); + sink.stop(); + } +}
