Repository: flume Updated Branches: refs/heads/trunk 674f4fcce -> 1dfcb4b0e
Revert "FLUME-2338. Support coalescing increments in HBaseSink." This reverts commit 674f4fcce2597e7e934ccc69eb04b426f5a9b8bb. Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/34836ce6 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/34836ce6 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/34836ce6 Branch: refs/heads/trunk Commit: 34836ce6cef7c0f0ae13b2e8d9a63ca838ce8ace Parents: 674f4fc Author: Hari Shreedharan <[email protected]> Authored: Mon Mar 3 00:25:15 2014 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Mon Mar 3 00:25:15 2014 -0800 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 - .../org/apache/flume/sink/hbase/BatchAware.java | 28 -- .../org/apache/flume/sink/hbase/HBaseSink.java | 195 +----------- .../sink/hbase/IncrementHBaseSerializer.java | 80 ----- .../apache/flume/sink/hbase/TestHBaseSink.java | 298 ++++--------------- 5 files changed, 53 insertions(+), 550 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/34836ce6/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index cedb283..96bf73e 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1839,8 +1839,6 @@ Property Name Default Desc zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml batchSize 100 Number of events to be written per txn. -coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give - better performance if there are multiple increments to a limited number of cells. serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = "iCol", payload column = "pCol". serializer.* -- Properties to be passed to the serializer. kerberosPrincipal -- Kerberos user principal for accessing secure HBase http://git-wip-us.apache.org/repos/asf/flume/blob/34836ce6/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java deleted file mode 100644 index 0974241..0000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -/** - * This interface allows for implementing HBase serializers that are aware of - * batching. {@link #onBatchStart()} is called at the beginning of each batch - * by the sink. - */ -public interface BatchAware { - public void onBatchStart(); -} http://git-wip-us.apache.org/repos/asf/flume/blob/34836ce6/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index 0390ff8..c4a666c 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -19,23 +19,15 @@ package org.apache.flume.sink.hbase; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.Transaction; -import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; @@ -60,7 +52,7 @@ import org.apache.hadoop.hbase.security.User; /** * * A simple sink which reads events from a channel and writes them to HBase. - * The Hbase configuration is picked up from the first <tt>hbase-site.xml</tt> + * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt> * encountered in the classpath. This sink supports batch reading of * events from the channel, and writing them to Hbase, to minimize the number * of flushes on the hbase tables. To use this sink, it has to be configured @@ -105,13 +97,8 @@ public class HBaseSink extends AbstractSink implements Configurable { private String kerberosKeytab; private User hbaseUser; private boolean enableWal = true; - private boolean batchIncrements = false; - private Method refGetFamilyMap; private SinkCounter sinkCounter; - // Internal hooks used for unit testing. - private DebugIncrementsCallback debugIncrCallback = null; - public HBaseSink(){ this(HBaseConfiguration.create()); } @@ -120,13 +107,6 @@ public class HBaseSink extends AbstractSink implements Configurable { this.config = conf; } - @VisibleForTesting - @InterfaceAudience.Private - HBaseSink(Configuration conf, DebugIncrementsCallback cb) { - this(conf); - this.debugIncrCallback = cb; - } - @Override public void start(){ Preconditions.checkArgument(table == null, "Please call stop " + @@ -242,17 +222,6 @@ public class HBaseSink extends AbstractSink implements Configurable { "writes to HBase will have WAL disabled, and any data in the " + "memstore of this region in the Region Server could be lost!"); } - - batchIncrements = context.getBoolean( - HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, - HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS); - - if (batchIncrements) { - logger.info("Increment coalescing is enabled. Increments will be " + - "buffered."); - reflectLookupGetFamilyMap(); - } - String zkQuorum = context.getString(HBaseSinkConfigurationConstants .ZK_QUORUM); Integer port = null; @@ -312,11 +281,6 @@ public class HBaseSink extends AbstractSink implements Configurable { List<Increment> incs = new LinkedList<Increment>(); try { txn.begin(); - - if (serializer instanceof BatchAware) { - ((BatchAware)serializer).onBatchStart(); - } - long i = 0; for (; i < batchSize; i++) { Event event = channel.take(); @@ -345,7 +309,7 @@ public class HBaseSink extends AbstractSink implements Configurable { try{ txn.rollback(); } catch (Exception e2) { - logger.error("Exception in rollback. Rollback might not have been " + + logger.error("Exception in rollback. Rollback might not have been" + "successful." , e2); } logger.error("Failed to commit transaction." + @@ -389,20 +353,7 @@ public class HBaseSink extends AbstractSink implements Configurable { runPrivileged(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { - - List<Increment> processedIncrements; - if (batchIncrements) { - processedIncrements = coalesceIncrements(incs); - } else { - processedIncrements = incs; - } - - // Only used for unit testing. - if (debugIncrCallback != null) { - debugIncrCallback.onAfterCoalesce(processedIncrements); - } - - for (final Increment i : processedIncrements) { + for (final Increment i : incs) { i.setWriteToWAL(enableWal); table.increment(i); } @@ -413,7 +364,6 @@ public class HBaseSink extends AbstractSink implements Configurable { txn.commit(); sinkCounter.addToEventDrainSuccessCount(actions.size()); } - private <T> T runPrivileged(final PrivilegedExceptionAction<T> action) throws Exception { if(hbaseUser != null) { @@ -425,143 +375,4 @@ public class HBaseSink extends AbstractSink implements Configurable { return action.run(); } } - - /** - * The method getFamilyMap() is no longer available in Hbase 0.96. - * We must use reflection to determine which version we may use. - */ - private void reflectLookupGetFamilyMap() { - refGetFamilyMap = null; - String[] methodNames = { "getFamilyMap", "getFamilyMapOfLongs" }; - for (String methodName : methodNames) { - try { - refGetFamilyMap = Increment.class.getMethod(methodName); - if (refGetFamilyMap != null) { - logger.debug("Using Increment.{} for coalesce", methodName); - break; - } - } catch (NoSuchMethodException e) { - logger.debug("Increment.{} does not exist. Exception follows.", - methodName, e); - } catch (SecurityException e) { - logger.debug("No access to Increment.{}; Exception follows.", - methodName, e); - } - } - if (refGetFamilyMap == null) { - throw new UnsupportedOperationException( - "Cannot find Increment.getFamilyMap()"); - } - } - - @SuppressWarnings("unchecked") - private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment inc) { - Preconditions.checkNotNull(refGetFamilyMap, - "Increment.getFamilymap() not found"); - Preconditions.checkNotNull(inc, "Increment required"); - Map<byte[], NavigableMap<byte[], Long>> familyMap = null; - try { - Object familyObj = refGetFamilyMap.invoke(inc); - familyMap = (Map<byte[], NavigableMap<byte[], Long>>) familyObj; - } catch (IllegalAccessException e) { - logger.warn("Unexpected error calling getFamilyMap()", e); - Throwables.propagate(e); - } catch (InvocationTargetException e) { - logger.warn("Unexpected error calling getFamilyMap()", e); - Throwables.propagate(e); - } - return familyMap; - } - - /** - * Perform "compression" on the given set of increments so that Flume sends - * the minimum possible number of RPC operations to HBase per batch. - * @param incs Input: Increment objects to coalesce. - * @return List of new Increment objects after coalescing the unique counts. - */ - private List<Increment> coalesceIncrements(Iterable<Increment> incs) { - Preconditions.checkNotNull(incs, "List of Increments must not be null"); - // Aggregate all of the increment row/family/column counts. - // The nested map is keyed like this: {row, family, qualifier} => count. - Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters = - Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - for (Increment inc : incs) { - byte[] row = inc.getRow(); - Map<byte[], NavigableMap<byte[], Long>> families = getFamilyMap(inc); - for (Map.Entry<byte[], NavigableMap<byte[],Long>> familyEntry : families.entrySet()) { - byte[] family = familyEntry.getKey(); - NavigableMap<byte[], Long> qualifiers = familyEntry.getValue(); - for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) { - byte[] qualifier = qualifierEntry.getKey(); - Long count = qualifierEntry.getValue(); - incrementCounter(counters, row, family, qualifier, count); - } - } - } - - // Reconstruct list of Increments per unique row/family/qualifier. - List<Increment> coalesced = Lists.newLinkedList(); - for (Map.Entry<byte[], Map<byte[],NavigableMap<byte[], Long>>> rowEntry : counters.entrySet()) { - byte[] row = rowEntry.getKey(); - Map <byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue(); - Increment inc = new Increment(row); - for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) { - byte[] family = familyEntry.getKey(); - NavigableMap<byte[], Long> qualifiers = familyEntry.getValue(); - for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) { - byte[] qualifier = qualifierEntry.getKey(); - long count = qualifierEntry.getValue(); - inc.addColumn(family, qualifier, count); - } - } - coalesced.add(inc); - } - - return coalesced; - } - - /** - * Helper function for {@link #coalesceIncrements} to increment a counter - * value in the passed data structure. - * @param counters Nested data structure containing the counters. - * @param row Row key to increment. - * @param family Column family to increment. - * @param qualifier Column qualifier to increment. - * @param count Amount to increment by. - */ - private void incrementCounter( - Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters, - byte[] row, byte[] family, byte[] qualifier, Long count) { - - Map<byte[], NavigableMap<byte[], Long>> families = counters.get(row); - if (families == null) { - families = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - counters.put(row, families); - } - - NavigableMap<byte[], Long> qualifiers = families.get(family); - if (qualifiers == null) { - qualifiers = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - families.put(family, qualifiers); - } - - Long existingValue = qualifiers.get(qualifier); - if (existingValue == null) { - qualifiers.put(qualifier, count); - } else { - qualifiers.put(qualifier, existingValue + count); - } - } - - @VisibleForTesting - @InterfaceAudience.Private - HbaseEventSerializer getSerializer() { - return serializer; - } - - @VisibleForTesting - @InterfaceAudience.Private - interface DebugIncrementsCallback { - public void onAfterCoalesce(Iterable<Increment> increments); - } } http://git-wip-us.apache.org/repos/asf/flume/blob/34836ce6/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java deleted file mode 100644 index b4343eb..0000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import java.util.Collections; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.conf.ComponentConfiguration; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Row; - -import java.util.List; - -/** - * For Increment-related unit tests. - */ -class IncrementHBaseSerializer implements HbaseEventSerializer, BatchAware { - private Event event; - private byte[] family; - private int numBatchesStarted = 0; - - @Override public void configure(Context context) { } - @Override public void configure(ComponentConfiguration conf) { } - @Override public void close() { } - - @Override - public void initialize(Event event, byte[] columnFamily) { - this.event = event; - this.family = columnFamily; - } - - // This class only creates Increments. - @Override - public List<Row> getActions() { - return Collections.emptyList(); - } - - // Treat each Event as a String, i,e, "row:qualifier". - @Override - public List<Increment> getIncrements() { - List<Increment> increments = Lists.newArrayList(); - String body = new String(event.getBody(), Charsets.UTF_8); - String[] pieces = body.split(":"); - String row = pieces[0]; - String qualifier = pieces[1]; - Increment inc = new Increment(row.getBytes(Charsets.UTF_8)); - inc.addColumn(family, qualifier.getBytes(Charsets.UTF_8), 1L); - increments.add(inc); - return increments; - } - - @Override - public void onBatchStart() { - numBatchesStarted++; - } - - @VisibleForTesting - public int getNumBatchesStarted() { - return numBatchesStarted; - } -} http://git-wip-us.apache.org/repos/asf/flume/blob/34836ce6/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java index 5b047dc..d1b0182 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java @@ -22,13 +22,9 @@ import static org.mockito.Mockito.*; import java.io.IOException; import java.util.Arrays; -import java.util.List; +import java.util.HashMap; import java.util.Map; -import java.util.NavigableMap; -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.primitives.Longs; + import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -41,95 +37,64 @@ import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestHBaseSink { - private static final Logger logger = - LoggerFactory.getLogger(TestHBaseSink.class); - private static final HBaseTestingUtility testUtility = new HBaseTestingUtility(); - private static final String tableName = "TestHbaseSink"; - private static final String columnFamily = "TestColumnFamily"; - private static final String inColumn = "iCol"; - private static final String plCol = "pCol"; - private static final String valBase = "testing hbase sink: jham"; +import com.google.common.primitives.Longs; - private Configuration conf; - private Context ctx; +public class TestHBaseSink { + private static HBaseTestingUtility testUtility = new HBaseTestingUtility(); + private static String tableName = "TestHbaseSink"; + private static String columnFamily = "TestColumnFamily"; + private static String inColumn = "iCol"; + private static String plCol = "pCol"; + private static Context ctx = new Context(); + private static String valBase = "testing hbase sink: jham"; + private static Configuration conf; @BeforeClass - public static void setUpOnce() throws Exception { + public static void setUp() throws Exception { testUtility.startMiniCluster(); + Map<String, String> ctxMap = new HashMap<String, String>(); + ctxMap.put("table", tableName); + ctxMap.put("columnFamily", columnFamily); + ctxMap.put("serializer", + "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer"); + ctxMap.put("serializer.payloadColumn", plCol); + ctxMap.put("serializer.incrementColumn", inColumn); + ctx.putAll(ctxMap); + conf = new Configuration(testUtility.getConfiguration()); } @AfterClass - public static void tearDownOnce() throws Exception { + public static void tearDown() throws Exception { testUtility.shutdownMiniCluster(); } - /** - * Most common context setup for unit tests using - * {@link SimpleHbaseEventSerializer}. - */ - @Before - public void setUp() throws IOException { - conf = new Configuration(testUtility.getConfiguration()); - ctx = new Context(); - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - } - @After - public void tearDown() throws IOException { - testUtility.deleteTable(tableName.getBytes()); - } - - /** - * Set up {@link Context} for use with {@link SimpleHbaseEventSerializer}. - */ - private void initContextForSimpleHbaseEventSerializer() { - ctx = new Context(); - ctx.put("table", tableName); - ctx.put("columnFamily", columnFamily); - ctx.put("serializer", SimpleHbaseEventSerializer.class.getName()); - ctx.put("serializer.payloadColumn", plCol); - ctx.put("serializer.incrementColumn", inColumn); - } - - /** - * Set up {@link Context} for use with {@link IncrementHBaseSerializer}. - */ - private void initContextForIncrementHBaseSerializer() { - ctx = new Context(); - ctx.put("table", tableName); - ctx.put("columnFamily", columnFamily); - ctx.put("serializer", IncrementHBaseSerializer.class.getName()); - } @Test public void testOneEventWithDefaults() throws Exception { //Create a context without setting increment column and payload Column - ctx = new Context(); - ctx.put("table", tableName); - ctx.put("columnFamily", columnFamily); - ctx.put("serializer", SimpleHbaseEventSerializer.class.getName()); + Map<String,String> ctxMap = new HashMap<String,String>(); + ctxMap.put("table", tableName); + ctxMap.put("columnFamily", columnFamily); + ctxMap.put("serializer", + "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer"); + Context tmpctx = new Context(); + tmpctx.putAll(ctxMap); + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); @@ -152,11 +117,12 @@ public class TestHBaseSink { Assert.assertArrayEquals(e.getBody(), out); out = results[1]; Assert.assertArrayEquals(Longs.toByteArray(1), out); + testUtility.deleteTable(tableName.getBytes()); } @Test public void testOneEvent() throws Exception { - initContextForSimpleHbaseEventSerializer(); + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); @@ -179,11 +145,12 @@ public class TestHBaseSink { Assert.assertArrayEquals(e.getBody(), out); out = results[1]; Assert.assertArrayEquals(Longs.toByteArray(1), out); + testUtility.deleteTable(tableName.getBytes()); } @Test public void testThreeEvents() throws Exception { - initContextForSimpleHbaseEventSerializer(); + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); ctx.put("batchSize", "3"); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); @@ -216,11 +183,12 @@ public class TestHBaseSink { Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); + testUtility.deleteTable(tableName.getBytes()); } @Test public void testMultipleBatches() throws Exception { - initContextForSimpleHbaseEventSerializer(); + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); ctx.put("batchSize", "2"); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); @@ -259,17 +227,11 @@ public class TestHBaseSink { Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); + testUtility.deleteTable(tableName.getBytes()); } @Test(expected = FlumeException.class) public void testMissingTable() throws Exception { - logger.info("Running testMissingTable()"); - initContextForSimpleHbaseEventSerializer(); - - // setUp() will create the table, so we delete it. - logger.info("Deleting table {}", tableName); - testUtility.deleteTable(tableName.getBytes()); - ctx.put("batchSize", "2"); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); @@ -278,8 +240,7 @@ public class TestHBaseSink { Channel channel = new MemoryChannel(); Configurables.configure(channel, new Context()); sink.setChannel(channel); - - logger.info("Writing data into channel"); + sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); for(int i = 0; i < 3; i++){ @@ -288,25 +249,7 @@ public class TestHBaseSink { } tx.commit(); tx.close(); - - logger.info("Starting sink and processing events"); - try { - logger.info("Calling sink.start()"); - sink.start(); // This method will throw. - - // We never get here, but we log in case the behavior changes. - logger.error("Unexpected error: Calling sink.process()"); - sink.process(); - logger.error("Unexpected error: Calling sink.stop()"); - sink.stop(); - } finally { - // Re-create the table so tearDown() doesn't throw. - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - } - - // FIXME: The test should never get here, the below code doesn't run. - Assert.fail(); - + sink.process(); HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 2); byte[] out; @@ -323,9 +266,9 @@ public class TestHBaseSink { out = results[2]; Assert.assertArrayEquals(Longs.toByteArray(2), out); sink.process(); + sink.stop(); } - // TODO: Move this test to a different class and run it stand-alone. /** * This test must run last - it shuts down the minicluster :D * @throws Exception @@ -337,8 +280,8 @@ public class TestHBaseSink { "and uncomment this annotation to run this test.") @Test(expected = EventDeliveryException.class) public void testHBaseFailure() throws Exception { - initContextForSimpleHbaseEventSerializer(); ctx.put("batchSize", "2"); + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); //Reset the context to a higher batchSize @@ -431,9 +374,8 @@ public class TestHBaseSink { @Test public void testTransactionStateOnChannelException() throws Exception { - initContextForSimpleHbaseEventSerializer(); ctx.put("batchSize", "1"); - + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); // Reset the context to a higher batchSize @@ -463,15 +405,15 @@ public class TestHBaseSink { Assert.assertArrayEquals(e.getBody(), out); out = results[1]; Assert.assertArrayEquals(Longs.toByteArray(1), out); + testUtility.deleteTable(tableName.getBytes()); } @Test public void testTransactionStateOnSerializationException() throws Exception { - initContextForSimpleHbaseEventSerializer(); ctx.put("batchSize", "1"); ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER, "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer"); - + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); // Reset the context to a higher batchSize @@ -502,11 +444,11 @@ public class TestHBaseSink { Assert.assertArrayEquals(e.getBody(), out); out = results[1]; Assert.assertArrayEquals(Longs.toByteArray(1), out); + testUtility.deleteTable(tableName.getBytes()); } @Test public void testWithoutConfigurationObject() throws Exception{ - initContextForSimpleHbaseEventSerializer(); Context tmpContext = new Context(ctx.getParameters()); tmpContext.put("batchSize", "2"); tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM, @@ -515,7 +457,7 @@ public class TestHBaseSink { tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); - + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(); Configurables.configure(sink, tmpContext); Channel channel = new MemoryChannel(); @@ -550,11 +492,11 @@ public class TestHBaseSink { Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); + testUtility.deleteTable(tableName.getBytes()); } @Test public void testZKQuorum() throws Exception{ - initContextForSimpleHbaseEventSerializer(); Context tmpContext = new Context(ctx.getParameters()); String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " + "zk3.flume.apache.org:3342"; @@ -574,7 +516,6 @@ public class TestHBaseSink { @Test (expected = FlumeException.class) public void testZKQuorumIncorrectPorts() throws Exception{ - initContextForSimpleHbaseEventSerializer(); Context tmpContext = new Context(ctx.getParameters()); String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " + @@ -588,143 +529,4 @@ public class TestHBaseSink { Configurables.configure(sink, tmpContext); Assert.fail(); } - - @Test - public void testCoalesce() throws EventDeliveryException { - initContextForIncrementHBaseSerializer(); - ctx.put("batchSize", "100"); - ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, - String.valueOf(true)); - - final Map<String, Long> expectedCounts = Maps.newHashMap(); - expectedCounts.put("r1:c1", 10L); - expectedCounts.put("r1:c2", 20L); - expectedCounts.put("r2:c1", 7L); - expectedCounts.put("r2:c3", 63L); - HBaseSink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts); - - HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb); - Configurables.configure(sink, ctx); - Channel channel = createAndConfigureMemoryChannel(sink); - - List<Event> events = Lists.newLinkedList(); - generateEvents(events, expectedCounts); - putEvents(channel, events); - - sink.start(); - sink.process(); // Calls CoalesceValidator instance. - sink.stop(); - } - - @Test(expected = AssertionError.class) - public void negativeTestCoalesce() throws EventDeliveryException { - initContextForIncrementHBaseSerializer(); - ctx.put("batchSize", "10"); - - final Map<String, Long> expectedCounts = Maps.newHashMap(); - expectedCounts.put("r1:c1", 10L); - HBaseSink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts); - - HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb); - Configurables.configure(sink, ctx); - Channel channel = createAndConfigureMemoryChannel(sink); - - List<Event> events = Lists.newLinkedList(); - generateEvents(events, expectedCounts); - putEvents(channel, events); - - sink.start(); - sink.process(); // Calls CoalesceValidator instance. - sink.stop(); - } - - @Test - public void testBatchAware() throws EventDeliveryException { - logger.info("Running testBatchAware()"); - initContextForIncrementHBaseSerializer(); - HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); - Configurables.configure(sink, ctx); - Channel channel = createAndConfigureMemoryChannel(sink); - - sink.start(); - int batchCount = 3; - for (int i = 0; i < batchCount; i++) { - sink.process(); - } - sink.stop(); - Assert.assertEquals(batchCount, - ((IncrementHBaseSerializer) sink.getSerializer()).getNumBatchesStarted()); - } - - /** - * For testing that the rows coalesced, serialized by - * {@link IncrementHBaseSerializer}, are of the expected batch size. - */ - private static class CoalesceValidator - implements HBaseSink.DebugIncrementsCallback { - - private final Map<String,Long> expectedCounts; - - public CoalesceValidator(Map<String, Long> expectedCounts) { - this.expectedCounts = expectedCounts; - } - - @Override - public void onAfterCoalesce(Iterable<Increment> increments) { - for (Increment inc : increments) { - byte[] row = inc.getRow(); - Map<byte[], NavigableMap<byte[], Long>> families = inc.getFamilyMap(); - for (byte[] family : families.keySet()) { - NavigableMap<byte[], Long> qualifiers = families.get(family); - for (Map.Entry<byte[], Long> entry : qualifiers.entrySet()) { - byte[] qualifier = entry.getKey(); - Long count = entry.getValue(); - StringBuilder b = new StringBuilder(20); - b.append(new String(row, Charsets.UTF_8)); - b.append(':'); - b.append(new String(qualifier, Charsets.UTF_8)); - String key = b.toString(); - Assert.assertEquals("Expected counts don't match observed for " + key, - expectedCounts.get(key), count); - } - } - } - } - } - - /** - * Add number of Events corresponding to counts to the events list. - * @param events Destination list. - * @param counts How many events to generate for each row:qualifier pair. - */ - private void generateEvents(List<Event> events, Map<String, Long> counts) { - for (String key : counts.keySet()) { - long count = counts.get(key); - for (long i = 0; i < count; i++) { - events.add(EventBuilder.withBody(key, Charsets.UTF_8)); - } - } - } - - private Channel createAndConfigureMemoryChannel(HBaseSink sink) { - Channel channel = new MemoryChannel(); - Context channelCtx = new Context(); - channelCtx.put("capacity", String.valueOf(1000L)); - channelCtx.put("transactionCapacity", String.valueOf(1000L)); - Configurables.configure(channel, channelCtx); - sink.setChannel(channel); - channel.start(); - return channel; - } - - private void putEvents(Channel channel, Iterable<Event> events) { - Transaction tx = channel.getTransaction(); - tx.begin(); - for (Event event : events) { - channel.put(event); - } - tx.commit(); - tx.close(); - } - -} +} \ No newline at end of file
