Repository: flume
Updated Branches:
  refs/heads/trunk be25c5e11 -> a7d2a289a


FLUME-3154. Add HBase client version check to AsyncHBaseSink and HBaseSink

The current implementation of HBaseSink and AsyncHbaseSink is not
compatible with the 2.0 version of HBase, which will be released soon.
This change adds a check and makes these sinks fail gracefully if
incompatible HBase jars can be found in the classpath.

This closes #160

Reviewers: Bessenyei Balázs Donát, Ferenc Szabo, Denes Arvay

(Miklos Csanady via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a7d2a289
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a7d2a289
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a7d2a289

Branch: refs/heads/trunk
Commit: a7d2a289a67f057a5cdb28dabbd3e651613dfa3e
Parents: be25c5e
Author: Miklos Csanady <[email protected]>
Authored: Mon Aug 28 11:15:12 2017 +0200
Committer: Denes Arvay <[email protected]>
Committed: Wed Aug 30 16:02:53 2017 +0200

----------------------------------------------------------------------
 .../apache/flume/sink/hbase/AsyncHBaseSink.java |  5 +++
 .../org/apache/flume/sink/hbase/HBaseSink.java  |  5 +++
 .../flume/sink/hbase/HBaseVersionCheck.java     | 43 ++++++++++++++++++++
 3 files changed, 53 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a7d2a289/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index f120f59..c202a57 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -34,6 +34,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.hadoop.conf.Configuration;
@@ -329,6 +330,10 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
 
   @Override
   public void configure(Context context) {
+    if (!HBaseVersionCheck.hasVersionLessThan2(logger)) {
+      throw new ConfigurationException(
+              "HBase major version number must be less than 2 for asynchbase 
sink. ");
+    }
     tableName = 
context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE);
     String cf = context.getString(
         HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY);

http://git-wip-us.apache.org/repos/asf/flume/blob/a7d2a289/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index 4c8b52b..29969ad 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -34,6 +34,7 @@ import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.auth.FlumeAuthenticationUtil;
 import org.apache.flume.auth.PrivilegedExecutor;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.hadoop.conf.Configuration;
@@ -197,6 +198,10 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
   @SuppressWarnings("unchecked")
   @Override
   public void configure(Context context) {
+    if (!HBaseVersionCheck.hasVersionLessThan2(logger)) {
+      throw new ConfigurationException(
+          "HBase major version number must be less than 2 for hbase-sink.");
+    }
     tableName = 
context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE);
     String cf = context.getString(
         HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY);

http://git-wip-us.apache.org/repos/asf/flume/blob/a7d2a289/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java
new file mode 100644
index 0000000..25d9faa
--- /dev/null
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hbase;
+
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.slf4j.Logger;
+
+class HBaseVersionCheck {
+
+  private static int getMajorVersion(String version) throws 
NumberFormatException {
+    return Integer.parseInt(version.split("\\.")[0]);
+  }
+
+  static boolean hasVersionLessThan2(Logger logger) {
+    String version = VersionInfo.getVersion();
+    try {
+      if (getMajorVersion(version) < 2) {
+        return true;
+      }
+    } catch (NumberFormatException ex) {
+      logger.error(ex.getMessage());
+    }
+    logger.error("Invalid HBase version:" + version);
+    return false;
+  }
+}
\ No newline at end of file

Reply via email to