Repository: flume Updated Branches: refs/heads/trunk be25c5e11 -> a7d2a289a
FLUME-3154. Add HBase client version check to AsyncHBaseSink and HBaseSink The current implementation of HBaseSink and AsyncHbaseSink is not compatible with the 2.0 version of HBase, which will be released soon. This change adds a check and makes these sinks fail gracefully if incompatible HBase jars can be found in the classpath. This closes #160 Reviewers: Bessenyei Balázs Donát, Ferenc Szabo, Denes Arvay (Miklos Csanady via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a7d2a289 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a7d2a289 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a7d2a289 Branch: refs/heads/trunk Commit: a7d2a289a67f057a5cdb28dabbd3e651613dfa3e Parents: be25c5e Author: Miklos Csanady <[email protected]> Authored: Mon Aug 28 11:15:12 2017 +0200 Committer: Denes Arvay <[email protected]> Committed: Wed Aug 30 16:02:53 2017 +0200 ---------------------------------------------------------------------- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 5 +++ .../org/apache/flume/sink/hbase/HBaseSink.java | 5 +++ .../flume/sink/hbase/HBaseVersionCheck.java | 43 ++++++++++++++++++++ 3 files changed, 53 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/a7d2a289/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index f120f59..c202a57 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -34,6 +34,7 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurationException; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; import org.apache.hadoop.conf.Configuration; @@ -329,6 +330,10 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { @Override public void configure(Context context) { + if (!HBaseVersionCheck.hasVersionLessThan2(logger)) { + throw new ConfigurationException( + "HBase major version number must be less than 2 for asynchbase sink. "); + } tableName = context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE); String cf = context.getString( HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY); http://git-wip-us.apache.org/repos/asf/flume/blob/a7d2a289/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index 4c8b52b..29969ad 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -34,6 +34,7 @@ import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.auth.FlumeAuthenticationUtil; import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurationException; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; import org.apache.hadoop.conf.Configuration; @@ -197,6 +198,10 @@ public class HBaseSink extends AbstractSink implements Configurable { @SuppressWarnings("unchecked") @Override public void configure(Context context) { + if (!HBaseVersionCheck.hasVersionLessThan2(logger)) { + throw new ConfigurationException( + "HBase major version number must be less than 2 for hbase-sink."); + } tableName = context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE); String cf = context.getString( HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY); http://git-wip-us.apache.org/repos/asf/flume/blob/a7d2a289/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java new file mode 100644 index 0000000..25d9faa --- /dev/null +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.sink.hbase; + +import org.apache.hadoop.hbase.util.VersionInfo; +import org.slf4j.Logger; + +class HBaseVersionCheck { + + private static int getMajorVersion(String version) throws NumberFormatException { + return Integer.parseInt(version.split("\\.")[0]); + } + + static boolean hasVersionLessThan2(Logger logger) { + String version = VersionInfo.getVersion(); + try { + if (getMajorVersion(version) < 2) { + return true; + } + } catch (NumberFormatException ex) { + logger.error(ex.getMessage()); + } + logger.error("Invalid HBase version:" + version); + return false; + } +} \ No newline at end of file
