Updated Branches: refs/heads/trunk ab0894c7f -> 11fada202
FLUME-1842: AsyncHBaseSink timeout is not calculated correctly (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/11fada20 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/11fada20 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/11fada20 Branch: refs/heads/trunk Commit: 11fada2026e34a704d9f710bdd24766c306d040c Parents: ab0894c Author: Brock Noland <[email protected]> Authored: Wed Jan 16 16:14:25 2013 -0800 Committer: Brock Noland <[email protected]> Committed: Wed Jan 16 16:14:25 2013 -0800 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 4 +- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 75 +++++++++++++-- .../hbase/HBaseSinkConfigurationConstants.java | 2 +- .../flume/sink/hbase/TestAsyncHBaseSink.java | 63 ++++++++++-- 4 files changed, 122 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 58a115e..aa92974 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1539,8 +1539,8 @@ zookeeperQuorum -- znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml **columnFamily** -- The column family in Hbase to write to. batchSize 100 Number of events to be written per txn. -timeout -- The length of time (in milliseconds) the sink waits for acks from hbase for - all events in a transaction. If no timeout is specified, the sink will wait forever. +timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for + all events in a transaction. serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer serializer.* -- Properties to be passed to the serializer. ================ ============================================================ ==================================================================================== http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 6b34873..0b6f885 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -20,9 +20,12 @@ package org.apache.flume.sink.hbase; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -108,13 +111,20 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private long timeout; private String zkQuorum; private String zkBaseDir; + private ExecutorService sinkCallbackPool; + private boolean isTest; public AsyncHBaseSink(){ this(null); } public AsyncHBaseSink(Configuration conf) { + this(conf, false); + } + + AsyncHBaseSink(Configuration conf, boolean isTimeoutTesting) { this.conf = conf; + isTest = isTimeoutTesting; } @Override @@ -184,6 +194,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } } } + client.flush(); } catch (Throwable e) { this.handleTransactionFailure(txn); this.checkIfChannelExceptionAndThrow(e); @@ -194,11 +205,15 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { sinkCounter.addToEventDrainAttemptCount(i); lock.lock(); + long startTime = System.nanoTime(); + long timeRemaining; try { while ((callbacksReceived.get() < callbacksExpected.get()) && !txnFail.get()) { + timeRemaining = timeout - (System.nanoTime() - startTime); + timeRemaining = (timeRemaining >= 0) ? timeRemaining : 0; try { - if(!condition.await(timeout, TimeUnit.MILLISECONDS)){ + if(!condition.await(timeRemaining, TimeUnit.NANOSECONDS)){ txnFail.set(true); logger.warn("HBase callbacks timed out. " + "Transaction will be rolled back."); @@ -288,6 +303,8 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { + "Sink will not timeout."); timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT; } + //Convert to nanos. + timeout = TimeUnit.MILLISECONDS.toNanos(timeout); zkQuorum = context.getString( HBaseSinkConfigurationConstants.ZK_QUORUM, "").trim(); @@ -300,7 +317,8 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { conf = HBaseConfiguration.create(); } zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM); - zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT); + zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); } Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(), "The Zookeeper quorum cannot be null and should be specified."); @@ -316,11 +334,13 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { + "before calling start on an old instance."); sinkCounter.start(); sinkCounter.incrementConnectionCreatedCount(); - if(zkBaseDir != null){ - client = new HBaseClient(zkQuorum, zkBaseDir); + if (!isTest) { + sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder() + .setNameFormat(this.getName() + " HBase Call Pool").build()); } else { - client = new HBaseClient(zkQuorum); + sinkCallbackPool = Executors.newSingleThreadExecutor(); } + client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean fail = new AtomicBoolean(false); client.ensureTableFamilyExists( @@ -366,6 +386,17 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { client.shutdown(); sinkCounter.incrementConnectionClosedCount(); sinkCounter.stop(); + sinkCallbackPool.shutdown(); + try { + if(!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) { + sinkCallbackPool.shutdownNow(); + } + } catch (InterruptedException e) { + logger.error("Interrupted while waiting for asynchbase sink pool to " + + "die", e); + sinkCallbackPool.shutdownNow(); + } + sinkCallbackPool = null; client = null; conf = null; open = false; @@ -397,15 +428,31 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private Lock lock; private AtomicInteger callbacksReceived; private Condition condition; + private final boolean isTimeoutTesting; public SuccessCallback(Lock lck, AtomicInteger callbacksReceived, Condition condition) { lock = lck; this.callbacksReceived = callbacksReceived; this.condition = condition; + isTimeoutTesting = isTest; } + @Override public R call(T arg) throws Exception { + if (isTimeoutTesting) { + try { + //tests set timeout to 10 seconds, so sleep for 4 seconds + TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4)); + } catch (InterruptedException e) { + //ignore + } + } + doCall(); + return null; + } + + private void doCall() throws Exception { callbacksReceived.incrementAndGet(); lock.lock(); try{ @@ -413,7 +460,6 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } finally { lock.unlock(); } - return null; } } @@ -422,17 +468,31 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private AtomicInteger callbacksReceived; private AtomicBoolean txnFail; private Condition condition; - + private final boolean isTimeoutTesting; public FailureCallback(Lock lck, AtomicInteger callbacksReceived, AtomicBoolean txnFail, Condition condition){ this.lock = lck; this.callbacksReceived = callbacksReceived; this.txnFail = txnFail; this.condition = condition; + isTimeoutTesting = isTest; } @Override public R call(T arg) throws Exception { + if (isTimeoutTesting) { + //tests set timeout to 10 seconds, so sleep for 4 seconds + try { + TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4)); + } catch (InterruptedException e) { + //ignore + } + } + doCall(); + return null; + } + + private void doCall() throws Exception { callbacksReceived.incrementAndGet(); this.txnFail.set(true); lock.lock(); @@ -441,7 +501,6 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } finally { lock.unlock(); } - return null; } } http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java index fad026c..fb6bd4e 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java @@ -49,7 +49,7 @@ public class HBaseSinkConfigurationConstants { public static final String CONFIG_TIMEOUT = "timeout"; - public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE; + public static final long DEFAULT_TIMEOUT = 60000; public static final String CONFIG_KEYTAB = "kerberosKeytab"; http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java index 1f61406..03c3e4c 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java @@ -59,6 +59,8 @@ import com.google.common.io.Files; import com.google.common.primitives.Longs; import java.lang.reflect.Method; +import org.junit.After; + public class TestAsyncHBaseSink { private static HBaseTestingUtility testUtility; private static MiniZooKeeperCluster zookeeperCluster; @@ -71,6 +73,7 @@ public class TestAsyncHBaseSink { private static String plCol = "pc"; private static Context ctx = new Context(); private static String valBase = "testing hbase sink: jham"; + private boolean deleteTable = true; @BeforeClass @@ -141,6 +144,8 @@ public class TestAsyncHBaseSink { "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer"); ctxMap.put("serializer.payloadColumn", plCol); ctxMap.put("serializer.incrementColumn", inColumn); + ctxMap.put("keep-alive", "0"); + ctxMap.put("timeout", "10000"); ctx.putAll(ctxMap); } @@ -151,13 +156,21 @@ public class TestAsyncHBaseSink { FileUtils.deleteDirectory(new File(workDir)); } + @After + public void tearDownTest() throws Exception { + if (deleteTable) { + testUtility.deleteTable(tableName.getBytes()); + } + } + @Test public void testOneEvent() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + deleteTable = true; AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); + Configurables.configure(channel, ctx); sink.setChannel(channel); sink.start(); Transaction tx = channel.getTransaction(); @@ -176,16 +189,16 @@ public class TestAsyncHBaseSink { Assert.assertArrayEquals(e.getBody(), out); out = results[1]; Assert.assertArrayEquals(Longs.toByteArray(1), out); - testUtility.deleteTable(tableName.getBytes()); } @Test public void testThreeEvents() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + deleteTable = true; AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); + Configurables.configure(channel, ctx); sink.setChannel(channel); sink.start(); Transaction tx = channel.getTransaction(); @@ -214,19 +227,46 @@ public class TestAsyncHBaseSink { Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); - testUtility.deleteTable(tableName.getBytes()); + } + + //This will without FLUME-1842's timeout fix - but with FLUME-1842's testing + //oriented changes to the callback classes and using single threaded executor + //for tests. + @Test (expected = EventDeliveryException.class) + public void testTimeOut() throws Exception { + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + deleteTable = true; + AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), + true); + Configurables.configure(sink, ctx); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, ctx); + sink.setChannel(channel); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + for(int i = 0; i < 3; i++){ + Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); + channel.put(e); + } + tx.commit(); + tx.close(); + Assert.assertFalse(sink.isConfNull()); + sink.process(); + Assert.fail(); } @Test public void testMultipleBatches() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + deleteTable = true; ctx.put("batchSize", "2"); AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); Configurables.configure(sink, ctx); //Reset the context to a higher batchSize ctx.put("batchSize", "100"); Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); + Configurables.configure(channel, ctx); sink.setChannel(channel); sink.start(); Transaction tx = channel.getTransaction(); @@ -261,17 +301,17 @@ public class TestAsyncHBaseSink { Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); - testUtility.deleteTable(tableName.getBytes()); } @Test public void testWithoutConfigurationObject() throws Exception{ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + deleteTable = true; ctx.put("batchSize", "2"); ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, testUtility.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)); ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); AsyncHBaseSink sink = new AsyncHBaseSink(); Configurables.configure(sink, ctx); // Reset context to values usable by other tests. @@ -279,7 +319,7 @@ public class TestAsyncHBaseSink { ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null); ctx.put("batchSize", "100"); Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); + Configurables.configure(channel, ctx); sink.setChannel(channel); sink.start(); Transaction tx = channel.getTransaction(); @@ -318,18 +358,18 @@ public class TestAsyncHBaseSink { Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); - testUtility.deleteTable(tableName.getBytes()); } @Test(expected = FlumeException.class) public void testMissingTable() throws Exception { + deleteTable = false; ctx.put("batchSize", "2"); AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); Configurables.configure(sink, ctx); //Reset the context to a higher batchSize ctx.put("batchSize", "100"); Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); + Configurables.configure(channel, ctx); sink.setChannel(channel); sink.start(); Transaction tx = channel.getTransaction(); @@ -373,12 +413,13 @@ public class TestAsyncHBaseSink { public void testHBaseFailure() throws Exception { ctx.put("batchSize", "2"); testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + deleteTable = false; AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); Configurables.configure(sink, ctx); //Reset the context to a higher batchSize ctx.put("batchSize", "100"); Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); + Configurables.configure(channel, ctx); sink.setChannel(channel); sink.start(); Transaction tx = channel.getTransaction();
