Updated Branches: refs/heads/trunk b84d01615 -> c4e2129fd
FLUME-2202. AsyncHBaseSink should coalesce increments to reduce RPC roundtrips (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c4e2129f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c4e2129f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c4e2129f Branch: refs/heads/trunk Commit: c4e2129fd12f97303a1b8120a2ecf7da456e1b77 Parents: b84d016 Author: Mike Percy <[email protected]> Authored: Thu Oct 3 17:25:57 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Thu Oct 3 17:25:57 2013 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 38 +++--- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 121 +++++++++++++++++-- .../hbase/HBaseSinkConfigurationConstants.java | 4 + .../hbase/IncrementAsyncHBaseSerializer.java | 78 ++++++++++++ .../flume/sink/hbase/TestAsyncHBaseSink.java | 82 ++++++++++++- 5 files changed, 291 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 5a59b56..dc8d05d 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1850,30 +1850,32 @@ AsyncHBaseSink '''''''''''''' This sink writes data to HBase using an asynchronous model. A class implementing -AsyncHbaseEventSerializer -which is specified by the configuration is used to convert the events into +AsyncHbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written -to HBase. This sink provides the same consistency guarantees as HBase, +to HBase. This sink uses the `Asynchbase API <https://github.com/OpenTSDB/asynchbase>`_ to write to +HBase. This sink provides the same consistency guarantees as HBase, which is currently row-wise atomicity. In the event of Hbase failing to write certain events, the sink will replay all events in that transaction. The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink. Required properties are in **bold**. -================ ============================================================ ==================================================================================== -Property Name Default Description -================ ============================================================ ==================================================================================== -**channel** -- -**type** -- The component type name, needs to be ``asynchbase`` -**table** -- The name of the table in Hbase to write to. -zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml -znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml -**columnFamily** -- The column family in Hbase to write to. -batchSize 100 Number of events to be written per txn. -timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for - all events in a transaction. -serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer -serializer.* -- Properties to be passed to the serializer. -================ ============================================================ ==================================================================================== +=================== ============================================================ ==================================================================================== +Property Name Default Description +=================== ============================================================ ==================================================================================== +**channel** -- +**type** -- The component type name, needs to be ``asynchbase`` +**table** -- The name of the table in Hbase to write to. +zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml +znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml +**columnFamily** -- The column family in Hbase to write to. +batchSize 100 Number of events to be written per txn. +coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give + better performance if there are multiple increments to a limited number of cells. +timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for + all events in a transaction. +serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer +serializer.* -- Properties to be passed to the serializer. +=================== ============================================================ ==================================================================================== Note that this sink takes the Zookeeper Quorum and parent znode information in the configuration. Zookeeper Quorum and parent node configuration may be http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 5e297b1..0545554 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -18,13 +18,19 @@ */ package org.apache.flume.sink.hbase; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.common.primitives.UnsignedBytes; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flume.Channel; import org.apache.flume.Context; @@ -113,20 +119,32 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private String zkQuorum; private String zkBaseDir; private ExecutorService sinkCallbackPool; - private boolean isTest; + private boolean isTimeoutTest; + private boolean isCoalesceTest; private boolean enableWal = true; + private boolean batchIncrements = false; + private volatile int totalCallbacksReceived = 0; + private Map<CellIdentifier, AtomicIncrementRequest> incrementBuffer; + + // Does not need to be thread-safe. Always called only from the sink's + // process method. + private final Comparator<byte[]> COMPARATOR = UnsignedBytes + .lexicographicalComparator(); public AsyncHBaseSink(){ this(null); } public AsyncHBaseSink(Configuration conf) { - this(conf, false); + this(conf, false, false); } - AsyncHBaseSink(Configuration conf, boolean isTimeoutTesting) { + @VisibleForTesting + AsyncHBaseSink(Configuration conf, boolean isTimeoutTest, + boolean isCoalesceTest) { this.conf = conf; - isTest = isTimeoutTesting; + this.isTimeoutTest = isTimeoutTest; + this.isCoalesceTest = isCoalesceTest; } @Override @@ -138,7 +156,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { * the next one is being processed. * */ - if(!open){ + if (!open) { throw new EventDeliveryException("Sink was never opened. " + "Please fix the configuration."); } @@ -147,6 +165,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { AtomicInteger callbacksExpected = new AtomicInteger(0); final Lock lock = new ReentrantLock(); final Condition condition = lock.newCondition(); + if (incrementBuffer != null) { + incrementBuffer.clear(); + } /* * Callbacks can be reused per transaction, since they share the same * locks and conditions. @@ -185,18 +206,41 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { serializer.setEvent(event); List<PutRequest> actions = serializer.getActions(); List<AtomicIncrementRequest> increments = serializer.getIncrements(); - callbacksExpected.addAndGet(actions.size() + increments.size()); + callbacksExpected.addAndGet(actions.size()); + if (!batchIncrements) { + callbacksExpected.addAndGet(increments.size()); + } for (PutRequest action : actions) { action.setDurable(enableWal); client.put(action).addCallbacks(putSuccessCallback, putFailureCallback); } for (AtomicIncrementRequest increment : increments) { - client.atomicIncrement(increment).addCallbacks( - incrementSuccessCallback, incrementFailureCallback); + if (batchIncrements) { + CellIdentifier identifier = new CellIdentifier(increment.key(), + increment.qualifier()); + AtomicIncrementRequest request + = incrementBuffer.get(identifier); + if (request == null) { + incrementBuffer.put(identifier, increment); + } else { + request.setAmount(request.getAmount() + increment.getAmount()); + } + } else { + client.atomicIncrement(increment).addCallbacks( + incrementSuccessCallback, incrementFailureCallback); + } } } } + if (batchIncrements) { + Collection<AtomicIncrementRequest> increments = incrementBuffer.values(); + for (AtomicIncrementRequest increment : increments) { + client.atomicIncrement(increment).addCallbacks( + incrementSuccessCallback, incrementFailureCallback); + } + callbacksExpected.addAndGet(increments.size()); + } client.flush(); } catch (Throwable e) { this.handleTransactionFailure(txn); @@ -216,7 +260,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { timeRemaining = timeout - (System.nanoTime() - startTime); timeRemaining = (timeRemaining >= 0) ? timeRemaining : 0; try { - if(!condition.await(timeRemaining, TimeUnit.NANOSECONDS)){ + if (!condition.await(timeRemaining, TimeUnit.NANOSECONDS)) { txnFail.set(true); logger.warn("HBase callbacks timed out. " + "Transaction will be rolled back."); @@ -231,6 +275,10 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { lock.unlock(); } + if (isCoalesceTest) { + totalCallbacksReceived += callbacksReceived.get(); + } + /* * At this point, either the txn has failed * or all callbacks received and txn is successful. @@ -246,7 +294,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { throw new EventDeliveryException("Could not write events to Hbase. " + "Transaction failed, and rolled back."); } else { - try{ + try { txn.commit(); txn.close(); sinkCounter.addToEventDrainSuccessCount(i); @@ -334,6 +382,21 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { "All writes to HBase will have WAL disabled, and any data in the " + "memstore of this region in the Region Server could be lost!"); } + + batchIncrements = context.getBoolean( + HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, + HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS); + + if(batchIncrements) { + incrementBuffer = Maps.newHashMap(); + logger.info("Increment coalescing is enabled. Increments will be " + + "buffered."); + } + } + + @VisibleForTesting + int getTotalCallbacksReceived() { + return totalCallbacksReceived; } @VisibleForTesting @@ -346,7 +409,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { + "before calling start on an old instance."); sinkCounter.start(); sinkCounter.incrementConnectionCreatedCount(); - if (!isTest) { + if (!isTimeoutTest) { sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder() .setNameFormat(this.getName() + " HBase Call Pool").build()); } else { @@ -447,7 +510,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { lock = lck; this.callbacksReceived = callbacksReceived; this.condition = condition; - isTimeoutTesting = isTest; + isTimeoutTesting = isTimeoutTest; } @Override @@ -487,7 +550,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { this.callbacksReceived = callbacksReceived; this.txnFail = txnFail; this.condition = condition; - isTimeoutTesting = isTest; + isTimeoutTesting = isTimeoutTest; } @Override @@ -525,4 +588,36 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } throw new EventDeliveryException("Error in processing transaction.", e); } + + private class CellIdentifier { + private final byte[] row; + private final byte[] column; + private final int hashCode; + // Since the sink operates only on one table and one cf, + // we use the data from the owning sink + public CellIdentifier(byte[] row, byte[] column) { + this.row = row; + this.column = column; + this.hashCode = + (Arrays.hashCode(row) * 31) * (Arrays.hashCode(column) * 31); + } + + @Override + public int hashCode() { + return hashCode; + } + + // Since we know that this class is used from only this class, + // skip the class comparison to save time + @Override + public boolean equals(Object other) { + CellIdentifier o = (CellIdentifier) other; + if (other == null) { + return false; + } else { + return (COMPARATOR.compare(row, o.row) == 0 + && COMPARATOR.compare(column, o.column) == 0); + } + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java index 7fdc75b..1a78071 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java @@ -66,4 +66,8 @@ public class HBaseSinkConfigurationConstants { public static final String DEFAULT_ZK_ZNODE_PARENT = HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; + public static final String CONFIG_COALESCE_INCREMENTS = "coalesceIncrements"; + + public static final Boolean DEFAULT_COALESCE_INCREMENTS = false; + } http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java new file mode 100644 index 0000000..b8aefe8 --- /dev/null +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.hbase; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.hbase.async.AtomicIncrementRequest; +import org.hbase.async.PutRequest; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * An AsyncHBaseEventSerializer implementation that increments a configured + * column for the row whose row key is the event's body bytes. + */ +public class IncrementAsyncHBaseSerializer implements AsyncHbaseEventSerializer { + private byte[] table; + private byte[] cf; + private byte[] column; + private Event currentEvent; + @Override + public void initialize(byte[] table, byte[] cf) { + this.table = table; + this.cf = cf; + } + + @Override + public void setEvent(Event event) { + this.currentEvent = event; + } + + @Override + public List<PutRequest> getActions() { + return Collections.emptyList(); + } + + @Override + public List<AtomicIncrementRequest> getIncrements() { + List<AtomicIncrementRequest> incrs + = new ArrayList<AtomicIncrementRequest>(); + AtomicIncrementRequest incr = new AtomicIncrementRequest(table, + currentEvent.getBody(), cf, column, 1); + incrs.add(incr); + return incrs; + } + + @Override + public void cleanUp() { + } + + @Override + public void configure(Context context) { + column = context.getString("column", "col").getBytes(); + } + + @Override + public void configure(ComponentConfiguration conf) { + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java index a0c04eb..ccbc086 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java @@ -204,7 +204,7 @@ public class TestAsyncHBaseSink { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); deleteTable = true; AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), - true); + true, false); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); Configurables.configure(channel, ctx); @@ -271,6 +271,86 @@ public class TestAsyncHBaseSink { } @Test + public void testMultipleBatchesBatchIncrementsWithCoalescing() + throws Exception { + doTestMultipleBatchesBatchIncrements(true); + } + + @Test + public void testMultipleBatchesBatchIncrementsNoCoalescing() + throws Exception { + doTestMultipleBatchesBatchIncrements(false); + } + + public void doTestMultipleBatchesBatchIncrements(boolean coalesce) throws + Exception { + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + deleteTable = true; + AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), + false, true); + if (coalesce) { + ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, + "true"); + } + ctx.put("batchSize", "2"); + ctx.put("serializer", IncrementAsyncHBaseSerializer.class.getName()); + ctx.put("serializer.column", "test"); + Configurables.configure(sink, ctx); + //Reset the context to a higher batchSize + ctx.put("batchSize", "100"); + // Restore the original serializer + ctx.put("serializer", SimpleAsyncHbaseEventSerializer.class.getName()); + //Restore the no coalescing behavior + ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, + "false"); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, ctx); + sink.setChannel(channel); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < 4; i++) { + for (int j = 0; j < 3; j++) { + Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); + channel.put(e); + } + } + tx.commit(); + tx.close(); + int count = 0; + Status status = Status.READY; + while (status != Status.BACKOFF) { + count++; + status = sink.process(); + } + Assert.assertFalse(sink.isConfNull()); + sink.stop(); + Assert.assertEquals(7, count); + HTable table = new HTable(testUtility.getConfiguration(), tableName); + Scan scan = new Scan(); + scan.addColumn(columnFamily.getBytes(),"test".getBytes()); + scan.setStartRow(Bytes.toBytes(valBase)); + ResultScanner rs = table.getScanner(scan); + int i = 0; + try { + for (Result r = rs.next(); r != null; r = rs.next()) { + byte[] out = r.getValue(columnFamily.getBytes(), "test".getBytes()); + Assert.assertArrayEquals(Longs.toByteArray(3), out); + Assert.assertTrue(new String(r.getRow()).startsWith(valBase)); + i++; + } + } finally { + rs.close(); + } + Assert.assertEquals(4, i); + if (coalesce) { + Assert.assertEquals(8, sink.getTotalCallbacksReceived()); + } else { + Assert.assertEquals(12, sink.getTotalCallbacksReceived()); + } + } + + @Test public void testWithoutConfigurationObject() throws Exception{ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); deleteTable = true;
