Repository: incubator-impala Updated Branches: refs/heads/master 7843b472f -> 9f678a742
IMPALA-4687: Get Impala working against HBase 2.0 This changes Impala code to tolerate the API differences between HBase 1.0 and HBase 2.0. It also drops compatibility code for older HBase versions. Specific changes: 1. Tolerate return value of Scan for Scan.setCaching() and Scan.setCacheBlocks(). This has no impact on our code. 2. HBase 2.0 eliminates the ScannerTimeoutException. The case that previously generated the exception will now recreate the scanner, so it is not necessary for our code to recreate the scanner. Short-circuit HandleResultScannerTimeout on HBase 2.0. 3. HBase 2.0 eliminates the Put.add(), which has been replaced with Put.addColumn(). This API exists in HBase 1.0, so it is safe to switch this completely. This was tested by verifying that an HBase 2.0 cluster starts up. Change-Id: I87610e25c01b3547ec332c6975b61284b6837d27 Reviewed-on: http://gerrit.cloudera.org:8080/7277 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/50071597 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/50071597 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/50071597 Branch: refs/heads/master Commit: 50071597a61eefc5afc7a5ce4db56a343172ea44 Parents: 7843b47 Author: Joe McDonnell <[email protected]> Authored: Thu Jun 22 17:06:49 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Sat Jul 1 03:45:19 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hbase-table-scanner.cc | 97 ++++++++++++++++----------------- be/src/exec/hbase-table-scanner.h | 25 ++++++--- be/src/exec/hbase-table-writer.cc | 8 +-- be/src/exec/hbase-table-writer.h | 4 +- be/src/util/jni-util.cc | 11 ++++ be/src/util/jni-util.h | 6 ++ 6 files changed, 86 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50071597/be/src/exec/hbase-table-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-scanner.cc b/be/src/exec/hbase-table-scanner.cc index 79e45d0..6e0d1ac 100644 --- a/be/src/exec/hbase-table-scanner.cc +++ b/be/src/exec/hbase-table-scanner.cc @@ -153,26 +153,16 @@ Status HBaseTableScanner::Init() { JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/filter/CompareFilter$CompareOp", &compare_op_cl_)); - RETURN_IF_ERROR( - JniUtil::GetGlobalClassRef(env, - "org/apache/hadoop/hbase/client/ScannerTimeoutException", - &scanner_timeout_ex_cl_)); - - // Distinguish HBase versions by checking for the existence of the Cell class. - // HBase 0.95.2: Use Cell class and corresponding methods. - // HBase prior to 0.95.2: Use the KeyValue class and Cell-equivalent methods. - bool has_cell_class = JniUtil::ClassExists(env, "org/apache/hadoop/hbase/Cell"); - if (has_cell_class) { - LOG(INFO) << "Detected HBase version >= 0.95.2"; - RETURN_IF_ERROR( - JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/Cell", &cell_cl_)); - } else { - // Assume a HBase version prior to 0.95.2 because the Cell class wasn't found. - LOG(INFO) << "Detected HBase version < 0.95.2"; - RETURN_IF_ERROR( - JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/KeyValue", - &cell_cl_)); + // ScannerTimeoutException is removed in HBase 2.0. Leave this as null if + // ScannerTimeoutException does not exist. + if (JniUtil::ClassExists(env, + "org/apache/hadoop/hbase/client/ScannerTimeoutException")) { + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "org/apache/hadoop/hbase/client/ScannerTimeoutException", + &scanner_timeout_ex_cl_)); } + RETURN_IF_ERROR( + JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/Cell", &cell_cl_)); // Scan method ids. scan_ctor_ = env->GetMethodID(scan_cl_, "<init>", "()V"); @@ -180,16 +170,31 @@ Status HBaseTableScanner::Init() { scan_set_max_versions_id_ = env->GetMethodID(scan_cl_, "setMaxVersions", "(I)Lorg/apache/hadoop/hbase/client/Scan;"); RETURN_ERROR_IF_EXC(env); - scan_set_caching_id_ = env->GetMethodID(scan_cl_, "setCaching", "(I)V"); - RETURN_ERROR_IF_EXC(env); - scan_set_cache_blocks_id_ = env->GetMethodID(scan_cl_, "setCacheBlocks", "(Z)V"); - RETURN_ERROR_IF_EXC(env); + + // The signature of setCaching and setCacheBlocks returns either void (old behavior) + // or a Scan object (new behavior). Since the code doesn't use the return value, + // tolerate either. However, the two are expected to be consistent. + if (JniUtil::MethodExists(env, scan_cl_, "setCaching", "(I)V")) { + scan_set_caching_id_ = env->GetMethodID(scan_cl_, "setCaching", "(I)V"); + RETURN_ERROR_IF_EXC(env); + scan_set_cache_blocks_id_ = env->GetMethodID(scan_cl_, "setCacheBlocks", "(Z)V"); + RETURN_ERROR_IF_EXC(env); + } else { + scan_set_caching_id_ = env->GetMethodID(scan_cl_, "setCaching", + "(I)Lorg/apache/hadoop/hbase/client/Scan;"); + RETURN_ERROR_IF_EXC(env); + scan_set_cache_blocks_id_ = env->GetMethodID(scan_cl_, "setCacheBlocks", + "(Z)Lorg/apache/hadoop/hbase/client/Scan;"); + RETURN_ERROR_IF_EXC(env); + } + scan_add_column_id_ = env->GetMethodID(scan_cl_, "addColumn", "([B[B)Lorg/apache/hadoop/hbase/client/Scan;"); RETURN_ERROR_IF_EXC(env); scan_set_filter_id_ = env->GetMethodID(scan_cl_, "setFilter", "(Lorg/apache/hadoop/hbase/filter/Filter;)Lorg/apache/hadoop/hbase/client/Scan;"); RETURN_ERROR_IF_EXC(env); + // TODO: IMPALA-5584: In HBase 2.0, setStartRow() and setStopRow() are deprecated. scan_set_start_row_id_ = env->GetMethodID(scan_cl_, "setStartRow", "([B)Lorg/apache/hadoop/hbase/client/Scan;"); RETURN_ERROR_IF_EXC(env); @@ -205,38 +210,25 @@ Status HBaseTableScanner::Init() { RETURN_ERROR_IF_EXC(env); // Result method ids. - if (has_cell_class) { - result_raw_cells_id_ = env->GetMethodID(result_cl_, "rawCells", - "()[Lorg/apache/hadoop/hbase/Cell;"); - } else { - result_raw_cells_id_ = env->GetMethodID(result_cl_, "raw", - "()[Lorg/apache/hadoop/hbase/KeyValue;"); - } + result_raw_cells_id_ = env->GetMethodID(result_cl_, "rawCells", + "()[Lorg/apache/hadoop/hbase/Cell;"); RETURN_ERROR_IF_EXC(env); result_isempty_id_ = env->GetMethodID(result_cl_, "isEmpty", "()Z"); RETURN_ERROR_IF_EXC(env); - // Cell or equivalent KeyValue method ids. - // Method ids to retrieve buffers backing different portions of row data. - if (has_cell_class) { - cell_get_row_array_ = env->GetMethodID(cell_cl_, "getRowArray", "()[B"); - RETURN_ERROR_IF_EXC(env); - cell_get_family_array_ = env->GetMethodID(cell_cl_, "getFamilyArray", "()[B"); - RETURN_ERROR_IF_EXC(env); - cell_get_qualifier_array_ = env->GetMethodID(cell_cl_, "getQualifierArray", "()[B"); - RETURN_ERROR_IF_EXC(env); - cell_get_value_array_ = env->GetMethodID(cell_cl_, "getValueArray", "()[B"); - RETURN_ERROR_IF_EXC(env); - } else { - // In HBase versions prior to 0.95.2 all data from a row is backed by the same buffer - cell_get_row_array_ = cell_get_family_array_ = - cell_get_qualifier_array_ = cell_get_value_array_ = - env->GetMethodID(cell_cl_, "getBuffer", "()[B"); - RETURN_ERROR_IF_EXC(env); - } - // Method ids for retrieving lengths and offsets into buffers backing different - // portions of row data. Both the Cell and KeyValue classes support these methods. + // Cell method ids to retrieve buffers backing different portions of row data. + cell_get_row_array_ = env->GetMethodID(cell_cl_, "getRowArray", "()[B"); + RETURN_ERROR_IF_EXC(env); + cell_get_family_array_ = env->GetMethodID(cell_cl_, "getFamilyArray", "()[B"); + RETURN_ERROR_IF_EXC(env); + cell_get_qualifier_array_ = env->GetMethodID(cell_cl_, "getQualifierArray", "()[B"); + RETURN_ERROR_IF_EXC(env); + cell_get_value_array_ = env->GetMethodID(cell_cl_, "getValueArray", "()[B"); + RETURN_ERROR_IF_EXC(env); + + // Cell method ids for retrieving lengths and offsets into buffers backing different + // portions of row data. cell_get_family_offset_id_ = env->GetMethodID(cell_cl_, "getFamilyOffset", "()I"); RETURN_ERROR_IF_EXC(env); cell_get_family_length_id_ = env->GetMethodID(cell_cl_, "getFamilyLength", "()B"); @@ -416,8 +408,11 @@ Status HBaseTableScanner::HandleResultScannerTimeout(JNIEnv* env, bool* timeout) if (exc == NULL) return Status::OK(); // GetJniExceptionMsg gets the error message and clears the exception status (which is - // necessary). We return the error if the exception was not a ScannerTimeoutException. + // necessary). For HBase 2.0, ScannerTimeoutException does not exist, so simply + // return the error. Otherwise, we return the error if the exception was not a + // ScannerTimeoutException. Status status = JniUtil::GetJniExceptionMsg(env, false); + if (scanner_timeout_ex_cl_ == nullptr) return status; if (env->IsInstanceOf(exc, scanner_timeout_ex_cl_) != JNI_TRUE) return status; *timeout = true; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50071597/be/src/exec/hbase-table-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-scanner.h b/be/src/exec/hbase-table-scanner.h index dbf9a31..865dafc 100644 --- a/be/src/exec/hbase-table-scanner.h +++ b/be/src/exec/hbase-table-scanner.h @@ -55,11 +55,18 @@ class HBaseScanNode; /// be overridden by the query option hbase_caching. FE will also suggest a max value such /// that it won't put too much memory pressure on the region server. // -/// HBase version compatibility: Starting from HBase 0.95.2 result rows are represented by -/// Cells instead of KeyValues (prior HBase versions). To mitigate this API -/// incompatibility the Cell class and its methods are replaced with corresponding -/// KeyValue equivalents if the Cell is not found in the classpath. The HBase version -/// detection and KeyValue/Cell replacements are performed in Init(). +/// HBase version compatibility: This code supports HBase 1.0 and HBase 2.0 APIs. It +/// uses the Cell class for result rows rather than the older KeyValue class, which +/// limits support to HBase >= 0.95.2. The code handles some minor incompatibilities +/// across versions: +/// 1. Scan.setCaching() and Scan.setCacheBlocks() tolerate the older void return value +/// as well as the newer Scan return value (See HBASE-10841). +/// 2. ScannerTimeoutException has been removed in HBase 2.0, as HBase has a heartbeat +/// to prevent timeout. HBase 2.0 will reset the scanner and retry rather than +/// throwing an exception. See HBASE-16266 and HBASE-17809. +/// If ScannerTimeoutException does not exist, then HandleResultScannerTimeout() +/// simply checks for an exception and returns any error via status. +/// Both are detected and handled in Init(). // /// Note: When none of the requested family/qualifiers exist in a particular row, /// HBase will not return the row at all, leading to "missing" NULL values. @@ -153,14 +160,15 @@ class HBaseTableScanner { static jclass scan_cl_; static jclass resultscanner_cl_; static jclass result_cl_; - /// Cell or KeyValue class depending on HBase version (see class comment). static jclass cell_cl_; static jclass hconstants_cl_; static jclass filter_list_cl_; static jclass filter_list_op_cl_; static jclass single_column_value_filter_cl_; static jclass compare_op_cl_; - /// Exception thrown when a ResultScanner times out + /// Exception thrown when a ResultScanner times out. ScannerTimeoutException was + /// removed in HBase 2.0. In this case, scanner_timeout_ex_cl_ is null and HBase + /// will not throw this exception. static jclass scanner_timeout_ex_cl_; static jmethodID scan_ctor_; @@ -251,7 +259,8 @@ class HBaseTableScanner { /// ResultScanner times out. If a timeout occurs, the ResultScanner is re-created /// (with the scan range adjusted if some results have already been returned) and /// the exception is cleared. If any other exception is thrown, the error message - /// is returned in the status. + /// is returned in the status. In HBase 2.0, ScannerTimeoutException no longer + /// exists and the error message is returned in the status. /// 'timeout' is true if a ScannerTimeoutException was thrown, false otherwise. Status HandleResultScannerTimeout(JNIEnv* env, bool* timeout); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50071597/be/src/exec/hbase-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-writer.cc b/be/src/exec/hbase-table-writer.cc index e621ad8..14fe121 100644 --- a/be/src/exec/hbase-table-writer.cc +++ b/be/src/exec/hbase-table-writer.cc @@ -42,7 +42,7 @@ jmethodID HBaseTableWriter::put_ctor_ = NULL; jmethodID HBaseTableWriter::list_ctor_ = NULL; jmethodID HBaseTableWriter::list_add_id_ = NULL; -jmethodID HBaseTableWriter::put_add_id_ = NULL; +jmethodID HBaseTableWriter::put_addcolumn_id_ = NULL; HBaseTableWriter::HBaseTableWriter(HBaseTableDescriptor* table_desc, const vector<ScalarExprEvaluator*>& output_expr_evals, RuntimeProfile* profile) @@ -100,7 +100,7 @@ Status HBaseTableWriter::InitJNI() { RETURN_ERROR_IF_EXC(env); put_ctor_ = env->GetMethodID(put_cl_, "<init>", "([B)V"); RETURN_ERROR_IF_EXC(env); - put_add_id_ = env->GetMethodID(put_cl_, "add", + put_addcolumn_id_ = env->GetMethodID(put_cl_, "addColumn", "([B[B[B)Lorg/apache/hadoop/hbase/client/Put;"); RETURN_ERROR_IF_EXC(env); RETURN_IF_ERROR( @@ -171,8 +171,8 @@ Status HBaseTableWriter::AppendRows(RowBatch* batch) { DCHECK(put != NULL) << "Put shouldn't be NULL for non-key cols."; jbyteArray val_array; RETURN_IF_ERROR(CreateByteArray(env, data, data_len, &val_array)); - env->CallObjectMethod(put, put_add_id_, cf_arrays_[j-1], qual_arrays_[j-1], - val_array); + env->CallObjectMethod(put, put_addcolumn_id_, cf_arrays_[j-1], + qual_arrays_[j-1], val_array); RETURN_ERROR_IF_EXC(env); // Clean up the local references. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50071597/be/src/exec/hbase-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-writer.h b/be/src/exec/hbase-table-writer.h index 8360cd9..43ddaee 100644 --- a/be/src/exec/hbase-table-writer.h +++ b/be/src/exec/hbase-table-writer.h @@ -105,8 +105,8 @@ class HBaseTableWriter { /// new Put(byte[]) static jmethodID put_ctor_; - /// Put#add(byte[], byte[], byte[]) - static jmethodID put_add_id_; + /// Put#addColumn(byte[], byte[], byte[]) + static jmethodID put_addcolumn_id_; /// java.util.ArrayList static jclass list_cl_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50071597/be/src/util/jni-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc index c22460e..1631ea1 100644 --- a/be/src/util/jni-util.cc +++ b/be/src/util/jni-util.cc @@ -56,6 +56,17 @@ bool JniUtil::ClassExists(JNIEnv* env, const char* class_str) { return true; } +bool JniUtil::MethodExists(JNIEnv* env, jclass class_ref, const char* method_str, + const char* method_signature) { + env->GetMethodID(class_ref, method_str, method_signature); + jthrowable exc = env->ExceptionOccurred(); + if (exc != nullptr) { + env->ExceptionClear(); + return false; + } + return true; +} + Status JniUtil::GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref) { *class_ref = NULL; jclass local_cl = env->FindClass(class_str); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50071597/be/src/util/jni-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index 4ed8abc..cef10bc 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -168,6 +168,12 @@ class JniUtil { /// This function does not log any errors or exceptions. static bool ClassExists(JNIEnv* env, const char* class_str); + /// Return true if the given class has a non-static method with a specific name and + /// signature. Returns false otherwise, or if any other error occurred + /// (e.g. a JNI exception). This function does not log any errors or exceptions. + static bool MethodExists(JNIEnv* env, jclass class_ref, + const char* method_str, const char* method_signature); + /// Returns a global JNI reference to the class specified by class_str into class_ref. /// The returned reference must eventually be freed by calling FreeGlobalRef() (or have /// the lifetime of the impalad process).
