Updated Branches: refs/heads/trunk d898d7efd -> 510f63ba3
FLUME-1906 Ability to disable WAL for put operation in HBaseSink (Hari Shreedharan via Mubarak Seyed) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/510f63ba Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/510f63ba Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/510f63ba Branch: refs/heads/trunk Commit: 510f63ba39592e7912a85c35effb8be52699057a Parents: d898d7e Author: Mubarak Seyed <[email protected]> Authored: Thu Feb 14 23:48:41 2013 -0800 Committer: Mubarak Seyed <[email protected]> Committed: Thu Feb 14 23:48:41 2013 -0800 ---------------------------------------------------------------------- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 11 ++++++++ .../org/apache/flume/sink/hbase/HBaseSink.java | 21 +++++++++++++++ .../hbase/HBaseSinkConfigurationConstants.java | 4 +++ 3 files changed, 36 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/510f63ba/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 0b6f885..7020fcd 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -113,6 +113,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private String zkBaseDir; private ExecutorService sinkCallbackPool; private boolean isTest; + private boolean enableWal = true; public AsyncHBaseSink(){ this(null); @@ -186,6 +187,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { callbacksExpected.addAndGet(actions.size() + increments.size()); for (PutRequest action : actions) { + action.setDurable(enableWal); client.put(action).addCallbacks(putSuccessCallback, putFailureCallback); } for (AtomicIncrementRequest increment : increments) { @@ -322,6 +324,15 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(), "The Zookeeper quorum cannot be null and should be specified."); + + enableWal = context.getBoolean(HBaseSinkConfigurationConstants + .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL); + logger.info("The write to WAL option is set to: " + String.valueOf(enableWal)); + if(!enableWal) { + logger.warn("AsyncHBaseSink's enableWal configuration is set to false. " + + "All writes to HBase will have WAL disabled, and any data in the " + + "memstore of this region in the Region Server could be lost!"); + } } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/flume/blob/510f63ba/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index 835a69e..31fb7ff 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; @@ -95,6 +96,7 @@ public class HBaseSink extends AbstractSink implements Configurable { private String kerberosPrincipal; private String kerberosKeytab; private User hbaseUser; + private boolean enableWal = true; public HBaseSink(){ this(HBaseConfiguration.create()); @@ -197,6 +199,15 @@ public class HBaseSink extends AbstractSink implements Configurable { } kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB, ""); kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL, ""); + + enableWal = context.getBoolean(HBaseSinkConfigurationConstants + .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL); + logger.info("The write to WAL option is set to: " + String.valueOf(enableWal)); + if(!enableWal) { + logger.warn("HBase Sink's enableWal configuration is set to false. All " + + "writes to HBase will have WAL disabled, and any data in the " + + "memstore of this region in the Region Server could be lost!"); + } } @Override @@ -229,6 +240,15 @@ public class HBaseSink extends AbstractSink implements Configurable { runPrivileged(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { + for(Row r : actions) { + if(r instanceof Put) { + ((Put)r).setWriteToWAL(enableWal); + } + // Newer versions of HBase - Increment implements Row. + if(r instanceof Increment) { + ((Increment)r).setWriteToWAL(enableWal); + } + } table.batch(actions); return null; } @@ -238,6 +258,7 @@ public class HBaseSink extends AbstractSink implements Configurable { @Override public Void run() throws Exception { for (final Increment i : incs) { + i.setWriteToWAL(enableWal); table.increment(i); } return null; http://git-wip-us.apache.org/repos/asf/flume/blob/510f63ba/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java index fb6bd4e..7fdc75b 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java @@ -49,6 +49,10 @@ public class HBaseSinkConfigurationConstants { public static final String CONFIG_TIMEOUT = "timeout"; + public static final String CONFIG_ENABLE_WAL = "enableWal"; + + public static final boolean DEFAULT_ENABLE_WAL = true; + public static final long DEFAULT_TIMEOUT = 60000; public static final String CONFIG_KEYTAB = "kerberosKeytab";
