Repository: flume
Updated Branches:
  refs/heads/flume-1.5 cb3d84d35 -> 5a06871c7


Revert "FLUME-2338. Support coalescing increments in HBaseSink."

This reverts commit 674f4fcce2597e7e934ccc69eb04b426f5a9b8bb.


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ae022cf0
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ae022cf0
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ae022cf0

Branch: refs/heads/flume-1.5
Commit: ae022cf054280045de57f64bce3d63f910fa57fc
Parents: cb3d84d
Author: Hari Shreedharan <[email protected]>
Authored: Mon Mar 3 00:25:15 2014 -0800
Committer: Hari Shreedharan <[email protected]>
Committed: Mon Mar 3 01:07:17 2014 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   2 -
 .../org/apache/flume/sink/hbase/BatchAware.java |  28 --
 .../org/apache/flume/sink/hbase/HBaseSink.java  | 195 +-----------
 .../sink/hbase/IncrementHBaseSerializer.java    |  80 -----
 .../apache/flume/sink/hbase/TestHBaseSink.java  | 298 ++++---------------
 5 files changed, 53 insertions(+), 550 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ae022cf0/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index cedb283..96bf73e 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1839,8 +1839,6 @@ Property Name       Default                               
                  Desc
 zookeeperQuorum     --                                                      
The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` 
in hbase-site.xml
 znodeParent         /hbase                                                  
The base path for the znode for the -ROOT- region. Value of 
``zookeeper.znode.parent`` in hbase-site.xml
 batchSize           100                                                     
Number of events to be written per txn.
-coalesceIncrements  false                                                   
Should the sink coalesce multiple increments to a cell per batch. This might 
give
-                                                                            
better performance if there are multiple increments to a limited number of 
cells.
 serializer          org.apache.flume.sink.hbase.SimpleHbaseEventSerializer  
Default increment column = "iCol", payload column = "pCol".
 serializer.*        --                                                      
Properties to be passed to the serializer.
 kerberosPrincipal   --                                                      
Kerberos user principal for accessing secure HBase

http://git-wip-us.apache.org/repos/asf/flume/blob/ae022cf0/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java
deleted file mode 100644
index 0974241..0000000
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.sink.hbase;
-
-/**
- * This interface allows for implementing HBase serializers that are aware of
- * batching. {@link #onBatchStart()} is called at the beginning of each batch
- * by the sink.
- */
-public interface BatchAware {
-  public void onBatchStart();
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/ae022cf0/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index 0390ff8..c4a666c 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -19,23 +19,15 @@
 package org.apache.flume.sink.hbase;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
-import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
@@ -60,7 +52,7 @@ import org.apache.hadoop.hbase.security.User;
 /**
  *
  * A simple sink which reads events from a channel and writes them to HBase.
- * The Hbase configuration is picked up from the first <tt>hbase-site.xml</tt>
+ * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
  * encountered in the classpath. This sink supports batch reading of
  * events from the channel, and writing them to Hbase, to minimize the number
  * of flushes on the hbase tables. To use this sink, it has to be configured
@@ -105,13 +97,8 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
   private String kerberosKeytab;
   private User hbaseUser;
   private boolean enableWal = true;
-  private boolean batchIncrements = false;
-  private Method refGetFamilyMap;
   private SinkCounter sinkCounter;
 
-  // Internal hooks used for unit testing.
-  private DebugIncrementsCallback debugIncrCallback = null;
-
   public HBaseSink(){
     this(HBaseConfiguration.create());
   }
@@ -120,13 +107,6 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
     this.config = conf;
   }
 
-  @VisibleForTesting
-  @InterfaceAudience.Private
-  HBaseSink(Configuration conf, DebugIncrementsCallback cb) {
-    this(conf);
-    this.debugIncrCallback = cb;
-  }
-
   @Override
   public void start(){
     Preconditions.checkArgument(table == null, "Please call stop " +
@@ -242,17 +222,6 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
         "writes to HBase will have WAL disabled, and any data in the " +
         "memstore of this region in the Region Server could be lost!");
     }
-
-    batchIncrements = context.getBoolean(
-      HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
-      HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
-
-    if (batchIncrements) {
-      logger.info("Increment coalescing is enabled. Increments will be " +
-        "buffered.");
-      reflectLookupGetFamilyMap();
-    }
-
     String zkQuorum = context.getString(HBaseSinkConfigurationConstants
       .ZK_QUORUM);
     Integer port = null;
@@ -312,11 +281,6 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
     List<Increment> incs = new LinkedList<Increment>();
     try {
       txn.begin();
-
-      if (serializer instanceof BatchAware) {
-        ((BatchAware)serializer).onBatchStart();
-      }
-
       long i = 0;
       for (; i < batchSize; i++) {
         Event event = channel.take();
@@ -345,7 +309,7 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
       try{
         txn.rollback();
       } catch (Exception e2) {
-        logger.error("Exception in rollback. Rollback might not have been " +
+        logger.error("Exception in rollback. Rollback might not have been" +
             "successful." , e2);
       }
       logger.error("Failed to commit transaction." +
@@ -389,20 +353,7 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
     runPrivileged(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
-
-        List<Increment> processedIncrements;
-        if (batchIncrements) {
-          processedIncrements = coalesceIncrements(incs);
-        } else {
-          processedIncrements = incs;
-        }
-
-        // Only used for unit testing.
-        if (debugIncrCallback != null) {
-          debugIncrCallback.onAfterCoalesce(processedIncrements);
-        }
-
-        for (final Increment i : processedIncrements) {
+        for (final Increment i : incs) {
           i.setWriteToWAL(enableWal);
           table.increment(i);
         }
@@ -413,7 +364,6 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
     txn.commit();
     sinkCounter.addToEventDrainSuccessCount(actions.size());
   }
-
   private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
           throws Exception {
     if(hbaseUser != null) {
@@ -425,143 +375,4 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
       return action.run();
     }
   }
-
-  /**
-   * The method getFamilyMap() is no longer available in Hbase 0.96.
-   * We must use reflection to determine which version we may use.
-   */
-  private void reflectLookupGetFamilyMap() {
-    refGetFamilyMap = null;
-    String[] methodNames = { "getFamilyMap", "getFamilyMapOfLongs" };
-    for (String methodName : methodNames) {
-      try {
-        refGetFamilyMap = Increment.class.getMethod(methodName);
-        if (refGetFamilyMap != null) {
-          logger.debug("Using Increment.{} for coalesce", methodName);
-          break;
-        }
-      } catch (NoSuchMethodException e) {
-        logger.debug("Increment.{} does not exist. Exception follows.",
-            methodName, e);
-      } catch (SecurityException e) {
-        logger.debug("No access to Increment.{}; Exception follows.",
-            methodName, e);
-      }
-    }
-    if (refGetFamilyMap == null) {
-      throw new UnsupportedOperationException(
-          "Cannot find Increment.getFamilyMap()");
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment inc) {
-    Preconditions.checkNotNull(refGetFamilyMap,
-                               "Increment.getFamilymap() not found");
-    Preconditions.checkNotNull(inc, "Increment required");
-    Map<byte[], NavigableMap<byte[], Long>> familyMap = null;
-    try {
-      Object familyObj = refGetFamilyMap.invoke(inc);
-      familyMap = (Map<byte[], NavigableMap<byte[], Long>>) familyObj;
-    } catch (IllegalAccessException e) {
-      logger.warn("Unexpected error calling getFamilyMap()", e);
-      Throwables.propagate(e);
-    } catch (InvocationTargetException e) {
-      logger.warn("Unexpected error calling getFamilyMap()", e);
-      Throwables.propagate(e);
-    }
-    return familyMap;
-  }
-
-  /**
-   * Perform "compression" on the given set of increments so that Flume sends
-   * the minimum possible number of RPC operations to HBase per batch.
-   * @param incs Input: Increment objects to coalesce.
-   * @return List of new Increment objects after coalescing the unique counts.
-   */
-  private List<Increment> coalesceIncrements(Iterable<Increment> incs) {
-    Preconditions.checkNotNull(incs, "List of Increments must not be null");
-    // Aggregate all of the increment row/family/column counts.
-    // The nested map is keyed like this: {row, family, qualifier} => count.
-    Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters =
-        Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-    for (Increment inc : incs) {
-      byte[] row = inc.getRow();
-      Map<byte[], NavigableMap<byte[], Long>> families = getFamilyMap(inc);
-      for (Map.Entry<byte[], NavigableMap<byte[],Long>> familyEntry : 
families.entrySet()) {
-        byte[] family = familyEntry.getKey();
-        NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
-        for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
-          byte[] qualifier = qualifierEntry.getKey();
-          Long count = qualifierEntry.getValue();
-          incrementCounter(counters, row, family, qualifier, count);
-        }
-      }
-    }
-
-    // Reconstruct list of Increments per unique row/family/qualifier.
-    List<Increment> coalesced = Lists.newLinkedList();
-    for (Map.Entry<byte[], Map<byte[],NavigableMap<byte[], Long>>> rowEntry : 
counters.entrySet()) {
-      byte[] row = rowEntry.getKey();
-      Map <byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue();
-      Increment inc = new Increment(row);
-      for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : 
families.entrySet()) {
-        byte[] family = familyEntry.getKey();
-        NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
-        for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
-          byte[] qualifier = qualifierEntry.getKey();
-          long count = qualifierEntry.getValue();
-          inc.addColumn(family, qualifier, count);
-        }
-      }
-      coalesced.add(inc);
-    }
-
-    return coalesced;
-  }
-
-  /**
-   * Helper function for {@link #coalesceIncrements} to increment a counter
-   * value in the passed data structure.
-   * @param counters Nested data structure containing the counters.
-   * @param row Row key to increment.
-   * @param family Column family to increment.
-   * @param qualifier Column qualifier to increment.
-   * @param count Amount to increment by.
-   */
-  private void incrementCounter(
-      Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters,
-      byte[] row, byte[] family, byte[] qualifier, Long count) {
-
-    Map<byte[], NavigableMap<byte[], Long>> families = counters.get(row);
-    if (families == null) {
-      families = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-      counters.put(row, families);
-    }
-
-    NavigableMap<byte[], Long> qualifiers = families.get(family);
-    if (qualifiers == null) {
-      qualifiers = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-      families.put(family, qualifiers);
-    }
-
-    Long existingValue = qualifiers.get(qualifier);
-    if (existingValue == null) {
-      qualifiers.put(qualifier, count);
-    } else {
-      qualifiers.put(qualifier, existingValue + count);
-    }
-  }
-
-  @VisibleForTesting
-  @InterfaceAudience.Private
-  HbaseEventSerializer getSerializer() {
-    return serializer;
-  }
-
-  @VisibleForTesting
-  @InterfaceAudience.Private
-  interface DebugIncrementsCallback {
-    public void onAfterCoalesce(Iterable<Increment> increments);
-  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/ae022cf0/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java
deleted file mode 100644
index b4343eb..0000000
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.sink.hbase;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.conf.ComponentConfiguration;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Row;
-
-import java.util.List;
-
-/**
- * For Increment-related unit tests.
- */
-class IncrementHBaseSerializer implements HbaseEventSerializer, BatchAware {
-  private Event event;
-  private byte[] family;
-  private int numBatchesStarted = 0;
-
-  @Override public void configure(Context context) { }
-  @Override public void configure(ComponentConfiguration conf) { }
-  @Override public void close() { }
-
-  @Override
-  public void initialize(Event event, byte[] columnFamily) {
-    this.event = event;
-    this.family = columnFamily;
-  }
-
-  // This class only creates Increments.
-  @Override
-  public List<Row> getActions() {
-    return Collections.emptyList();
-  }
-
-  // Treat each Event as a String, i,e, "row:qualifier".
-  @Override
-  public List<Increment> getIncrements() {
-    List<Increment> increments = Lists.newArrayList();
-    String body = new String(event.getBody(), Charsets.UTF_8);
-    String[] pieces = body.split(":");
-    String row = pieces[0];
-    String qualifier = pieces[1];
-    Increment inc = new Increment(row.getBytes(Charsets.UTF_8));
-    inc.addColumn(family, qualifier.getBytes(Charsets.UTF_8), 1L);
-    increments.add(inc);
-    return increments;
-  }
-
-  @Override
-  public void onBatchStart() {
-    numBatchesStarted++;
-  }
-
-  @VisibleForTesting
-  public int getNumBatchesStarted() {
-    return numBatchesStarted;
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/ae022cf0/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index 5b047dc..d1b0182 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -22,13 +22,9 @@ import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.NavigableMap;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.primitives.Longs;
+
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -41,95 +37,64 @@ import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestHBaseSink {
-  private static final Logger logger =
-      LoggerFactory.getLogger(TestHBaseSink.class);
 
-  private static final HBaseTestingUtility testUtility = new 
HBaseTestingUtility();
-  private static final String tableName = "TestHbaseSink";
-  private static final String columnFamily = "TestColumnFamily";
-  private static final String inColumn = "iCol";
-  private static final String plCol = "pCol";
-  private static final String valBase = "testing hbase sink: jham";
+import com.google.common.primitives.Longs;
 
-  private Configuration conf;
-  private Context ctx;
+public class TestHBaseSink {
+  private static HBaseTestingUtility testUtility = new HBaseTestingUtility();
+  private static String tableName = "TestHbaseSink";
+  private static String columnFamily = "TestColumnFamily";
+  private static String inColumn = "iCol";
+  private static String plCol = "pCol";
+  private static Context ctx = new Context();
+  private static String valBase = "testing hbase sink: jham";
+  private static Configuration conf;
 
   @BeforeClass
-  public static void setUpOnce() throws Exception {
+  public static void setUp() throws Exception {
     testUtility.startMiniCluster();
+    Map<String, String> ctxMap = new HashMap<String, String>();
+    ctxMap.put("table", tableName);
+    ctxMap.put("columnFamily", columnFamily);
+    ctxMap.put("serializer",
+        "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer");
+    ctxMap.put("serializer.payloadColumn", plCol);
+    ctxMap.put("serializer.incrementColumn", inColumn);
+    ctx.putAll(ctxMap);
+    conf = new Configuration(testUtility.getConfiguration());
   }
 
   @AfterClass
-  public static void tearDownOnce() throws Exception {
+  public static void tearDown() throws Exception {
     testUtility.shutdownMiniCluster();
   }
 
-  /**
-   * Most common context setup for unit tests using
-   * {@link SimpleHbaseEventSerializer}.
-   */
-  @Before
-  public void setUp() throws IOException {
-    conf = new Configuration(testUtility.getConfiguration());
-    ctx = new Context();
-    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
-  }
 
-  @After
-  public void tearDown() throws IOException {
-    testUtility.deleteTable(tableName.getBytes());
-  }
-
-  /**
-   * Set up {@link Context} for use with {@link SimpleHbaseEventSerializer}.
-   */
-  private void initContextForSimpleHbaseEventSerializer() {
-    ctx = new Context();
-    ctx.put("table", tableName);
-    ctx.put("columnFamily", columnFamily);
-    ctx.put("serializer", SimpleHbaseEventSerializer.class.getName());
-    ctx.put("serializer.payloadColumn", plCol);
-    ctx.put("serializer.incrementColumn", inColumn);
-  }
-
-  /**
-   * Set up {@link Context} for use with {@link IncrementHBaseSerializer}.
-   */
-  private void initContextForIncrementHBaseSerializer() {
-    ctx = new Context();
-    ctx.put("table", tableName);
-    ctx.put("columnFamily", columnFamily);
-    ctx.put("serializer", IncrementHBaseSerializer.class.getName());
-  }
 
   @Test
   public void testOneEventWithDefaults() throws Exception {
     //Create a context without setting increment column and payload Column
-    ctx = new Context();
-    ctx.put("table", tableName);
-    ctx.put("columnFamily", columnFamily);
-    ctx.put("serializer", SimpleHbaseEventSerializer.class.getName());
+    Map<String,String> ctxMap = new HashMap<String,String>();
+    ctxMap.put("table", tableName);
+    ctxMap.put("columnFamily", columnFamily);
+    ctxMap.put("serializer",
+            "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer");
+    Context tmpctx = new Context();
+    tmpctx.putAll(ctxMap);
 
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
@@ -152,11 +117,12 @@ public class TestHBaseSink {
     Assert.assertArrayEquals(e.getBody(), out);
     out = results[1];
     Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testOneEvent() throws Exception {
-    initContextForSimpleHbaseEventSerializer();
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
@@ -179,11 +145,12 @@ public class TestHBaseSink {
     Assert.assertArrayEquals(e.getBody(), out);
     out = results[1];
     Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testThreeEvents() throws Exception {
-    initContextForSimpleHbaseEventSerializer();
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     ctx.put("batchSize", "3");
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
@@ -216,11 +183,12 @@ public class TestHBaseSink {
     Assert.assertEquals(3, found);
     out = results[3];
     Assert.assertArrayEquals(Longs.toByteArray(3), out);
+    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testMultipleBatches() throws Exception {
-    initContextForSimpleHbaseEventSerializer();
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     ctx.put("batchSize", "2");
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
@@ -259,17 +227,11 @@ public class TestHBaseSink {
     Assert.assertEquals(3, found);
     out = results[3];
     Assert.assertArrayEquals(Longs.toByteArray(3), out);
+    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test(expected = FlumeException.class)
   public void testMissingTable() throws Exception {
-    logger.info("Running testMissingTable()");
-    initContextForSimpleHbaseEventSerializer();
-
-    // setUp() will create the table, so we delete it.
-    logger.info("Deleting table {}", tableName);
-    testUtility.deleteTable(tableName.getBytes());
-
     ctx.put("batchSize", "2");
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
@@ -278,8 +240,7 @@ public class TestHBaseSink {
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, new Context());
     sink.setChannel(channel);
-
-    logger.info("Writing data into channel");
+    sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
     for(int i = 0; i < 3; i++){
@@ -288,25 +249,7 @@ public class TestHBaseSink {
     }
     tx.commit();
     tx.close();
-
-    logger.info("Starting sink and processing events");
-    try {
-      logger.info("Calling sink.start()");
-      sink.start(); // This method will throw.
-
-      // We never get here, but we log in case the behavior changes.
-      logger.error("Unexpected error: Calling sink.process()");
-      sink.process();
-      logger.error("Unexpected error: Calling sink.stop()");
-      sink.stop();
-    } finally {
-      // Re-create the table so tearDown() doesn't throw.
-      testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
-    }
-
-    // FIXME: The test should never get here, the below code doesn't run.
-    Assert.fail();
-
+    sink.process();
     HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 2);
     byte[] out;
@@ -323,9 +266,9 @@ public class TestHBaseSink {
     out = results[2];
     Assert.assertArrayEquals(Longs.toByteArray(2), out);
     sink.process();
+    sink.stop();
   }
 
-  // TODO: Move this test to a different class and run it stand-alone.
   /**
    * This test must run last - it shuts down the minicluster :D
    * @throws Exception
@@ -337,8 +280,8 @@ public class TestHBaseSink {
       "and uncomment this annotation to run this test.")
   @Test(expected = EventDeliveryException.class)
   public void testHBaseFailure() throws Exception {
-    initContextForSimpleHbaseEventSerializer();
     ctx.put("batchSize", "2");
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     //Reset the context to a higher batchSize
@@ -431,9 +374,8 @@ public class TestHBaseSink {
 
   @Test
   public void testTransactionStateOnChannelException() throws Exception {
-    initContextForSimpleHbaseEventSerializer();
     ctx.put("batchSize", "1");
-
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     // Reset the context to a higher batchSize
@@ -463,15 +405,15 @@ public class TestHBaseSink {
     Assert.assertArrayEquals(e.getBody(), out);
     out = results[1];
     Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testTransactionStateOnSerializationException() throws Exception {
-    initContextForSimpleHbaseEventSerializer();
     ctx.put("batchSize", "1");
     ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER,
         "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer");
-
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     // Reset the context to a higher batchSize
@@ -502,11 +444,11 @@ public class TestHBaseSink {
     Assert.assertArrayEquals(e.getBody(), out);
     out = results[1];
     Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testWithoutConfigurationObject() throws Exception{
-    initContextForSimpleHbaseEventSerializer();
     Context tmpContext = new Context(ctx.getParameters());
     tmpContext.put("batchSize", "2");
     tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
@@ -515,7 +457,7 @@ public class TestHBaseSink {
     tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
       conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
         HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
-
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink();
     Configurables.configure(sink, tmpContext);
     Channel channel = new MemoryChannel();
@@ -550,11 +492,11 @@ public class TestHBaseSink {
     Assert.assertEquals(3, found);
     out = results[3];
     Assert.assertArrayEquals(Longs.toByteArray(3), out);
+    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testZKQuorum() throws Exception{
-    initContextForSimpleHbaseEventSerializer();
     Context tmpContext = new Context(ctx.getParameters());
     String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " 
+
       "zk3.flume.apache.org:3342";
@@ -574,7 +516,6 @@ public class TestHBaseSink {
 
   @Test (expected = FlumeException.class)
   public void testZKQuorumIncorrectPorts() throws Exception{
-    initContextForSimpleHbaseEventSerializer();
     Context tmpContext = new Context(ctx.getParameters());
 
     String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " 
+
@@ -588,143 +529,4 @@ public class TestHBaseSink {
     Configurables.configure(sink, tmpContext);
     Assert.fail();
   }
-
-  @Test
-  public void testCoalesce() throws EventDeliveryException {
-    initContextForIncrementHBaseSerializer();
-    ctx.put("batchSize", "100");
-    ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
-        String.valueOf(true));
-
-    final Map<String, Long> expectedCounts = Maps.newHashMap();
-    expectedCounts.put("r1:c1", 10L);
-    expectedCounts.put("r1:c2", 20L);
-    expectedCounts.put("r2:c1", 7L);
-    expectedCounts.put("r2:c3", 63L);
-    HBaseSink.DebugIncrementsCallback cb = new 
CoalesceValidator(expectedCounts);
-
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb);
-    Configurables.configure(sink, ctx);
-    Channel channel = createAndConfigureMemoryChannel(sink);
-
-    List<Event> events = Lists.newLinkedList();
-    generateEvents(events, expectedCounts);
-    putEvents(channel, events);
-
-    sink.start();
-    sink.process(); // Calls CoalesceValidator instance.
-    sink.stop();
-  }
-
-  @Test(expected = AssertionError.class)
-  public void negativeTestCoalesce() throws EventDeliveryException {
-    initContextForIncrementHBaseSerializer();
-    ctx.put("batchSize", "10");
-
-    final Map<String, Long> expectedCounts = Maps.newHashMap();
-    expectedCounts.put("r1:c1", 10L);
-    HBaseSink.DebugIncrementsCallback cb = new 
CoalesceValidator(expectedCounts);
-
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb);
-    Configurables.configure(sink, ctx);
-    Channel channel = createAndConfigureMemoryChannel(sink);
-
-    List<Event> events = Lists.newLinkedList();
-    generateEvents(events, expectedCounts);
-    putEvents(channel, events);
-
-    sink.start();
-    sink.process(); // Calls CoalesceValidator instance.
-    sink.stop();
-  }
-
-  @Test
-  public void testBatchAware() throws EventDeliveryException {
-    logger.info("Running testBatchAware()");
-    initContextForIncrementHBaseSerializer();
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
-    Configurables.configure(sink, ctx);
-    Channel channel = createAndConfigureMemoryChannel(sink);
-
-    sink.start();
-    int batchCount = 3;
-    for (int i = 0; i < batchCount; i++) {
-      sink.process();
-    }
-    sink.stop();
-    Assert.assertEquals(batchCount,
-        ((IncrementHBaseSerializer) 
sink.getSerializer()).getNumBatchesStarted());
-  }
-
-  /**
-   * For testing that the rows coalesced, serialized by
-   * {@link IncrementHBaseSerializer}, are of the expected batch size.
-   */
-  private static class CoalesceValidator
-      implements HBaseSink.DebugIncrementsCallback {
-
-    private final Map<String,Long> expectedCounts;
-
-    public CoalesceValidator(Map<String, Long> expectedCounts) {
-      this.expectedCounts = expectedCounts;
-    }
-
-    @Override
-    public void onAfterCoalesce(Iterable<Increment> increments) {
-      for (Increment inc : increments) {
-        byte[] row = inc.getRow();
-        Map<byte[], NavigableMap<byte[], Long>> families = inc.getFamilyMap();
-        for (byte[] family : families.keySet()) {
-          NavigableMap<byte[], Long> qualifiers = families.get(family);
-          for (Map.Entry<byte[], Long> entry : qualifiers.entrySet()) {
-            byte[] qualifier = entry.getKey();
-            Long count = entry.getValue();
-            StringBuilder b = new StringBuilder(20);
-            b.append(new String(row, Charsets.UTF_8));
-            b.append(':');
-            b.append(new String(qualifier, Charsets.UTF_8));
-            String key = b.toString();
-            Assert.assertEquals("Expected counts don't match observed for " + 
key,
-                expectedCounts.get(key), count);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Add number of Events corresponding to counts to the events list.
-   * @param events Destination list.
-   * @param counts How many events to generate for each row:qualifier pair.
-   */
-  private void generateEvents(List<Event> events, Map<String, Long> counts) {
-    for (String key : counts.keySet()) {
-      long count = counts.get(key);
-      for (long i = 0; i < count; i++) {
-        events.add(EventBuilder.withBody(key, Charsets.UTF_8));
-      }
-    }
-  }
-
-  private Channel createAndConfigureMemoryChannel(HBaseSink sink) {
-    Channel channel = new MemoryChannel();
-    Context channelCtx = new Context();
-    channelCtx.put("capacity", String.valueOf(1000L));
-    channelCtx.put("transactionCapacity", String.valueOf(1000L));
-    Configurables.configure(channel, channelCtx);
-    sink.setChannel(channel);
-    channel.start();
-    return channel;
-  }
-
-  private void putEvents(Channel channel, Iterable<Event> events) {
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-    for (Event event : events) {
-      channel.put(event);
-    }
-    tx.commit();
-    tx.close();
-  }
-
-}
+}
\ No newline at end of file

Reply via email to