IMPALA-2615: support [[nodiscard]] on Status This is the set of changes required to get Impala to compile on GCC 7 using the [[nodiscard]] attribute, which generates a warning whenever a status is dropped. It is not enabled on the current default compiler GCC 4.9.2 or Clang 3.8 so I added WARN_UNUSED_RESULT in various classes so that we can catch the dropped statuses with our current toolchain.
The changes are: * Use the new [[nodiscard]] attribute and fix all the dropped statuses. Many were innocuous or very improbably but some appear to be actual bugs. Adds a discard_result() function that explicitly ignores the result of a function. * Removes the bad JNI pattern of checking for exceptions after DeleteGlobalRef(), which doesn't throw. * Fix miscellaneous compile errors and warnings. * Remove use of ptr_vector, which pulls in headers with deprecated things. * Fix a memory lifetime bug with default_fs_ (it was masked by the old refcounted std::string implementation). Change-Id: I972543af2e9f98b12dcbb5479b4c1a7d53952197 Reviewed-on: http://gerrit.cloudera.org:8080/7253 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/86e88cad Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/86e88cad Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/86e88cad Branch: refs/heads/master Commit: 86e88cad5d7ee41264bbb66742b267ef67dec47e Parents: da60a9a Author: Tim Armstrong <[email protected]> Authored: Wed Jun 14 17:35:08 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Aug 17 09:04:56 2017 +0000 ---------------------------------------------------------------------- CMakeLists.txt | 3 +- be/CMakeLists.txt | 13 +++++- be/src/benchmarks/bit-packing-benchmark.cc | 1 + be/src/benchmarks/expr-benchmark.cc | 6 +-- be/src/benchmarks/hash-benchmark.cc | 2 +- be/src/benchmarks/network-perf-benchmark.cc | 2 +- .../benchmarks/row-batch-serialize-benchmark.cc | 8 ++-- be/src/catalog/catalog-server.cc | 4 +- be/src/catalog/catalog-util.cc | 4 +- be/src/catalog/catalogd-main.cc | 4 +- be/src/codegen/codegen-symbol-emitter.cc | 2 +- be/src/codegen/llvm-codegen-test.cc | 2 +- be/src/common/compiler-util.h | 20 +++++++++ be/src/common/init.cc | 4 +- be/src/common/status.h | 6 +-- be/src/exec/external-data-source-executor.cc | 2 +- be/src/exec/hbase-scan-node.cc | 2 +- be/src/exec/hbase-table-scanner.cc | 27 ++++++----- be/src/exec/hbase-table-scanner.h | 44 ++++++++++-------- be/src/exec/hbase-table-writer.cc | 13 ++---- be/src/exec/hdfs-scan-node.cc | 2 +- be/src/exec/kudu-scan-node.cc | 2 +- be/src/exec/kudu-table-sink.h | 2 +- be/src/exec/kudu-util.h | 4 +- be/src/experiments/compression-test.cc | 7 +-- be/src/exprs/expr-codegen-test.cc | 10 ++--- be/src/exprs/expr-test.cc | 4 +- be/src/exprs/hive-udf-call.cc | 3 +- be/src/rpc/auth-provider.h | 14 +++--- be/src/rpc/thrift-client.cc | 10 ++--- be/src/rpc/thrift-client.h | 11 ++--- be/src/runtime/buffered-tuple-stream-test.cc | 6 +-- be/src/runtime/bufferpool/buffer-pool-test.cc | 27 ++++++----- be/src/runtime/client-cache.cc | 4 +- be/src/runtime/client-cache.h | 23 +++++----- be/src/runtime/collection-value-builder.h | 2 +- be/src/runtime/coordinator-backend-state.cc | 5 ++- be/src/runtime/data-stream-recvr.cc | 6 ++- be/src/runtime/data-stream-test.cc | 13 +++--- be/src/runtime/disk-io-mgr.cc | 2 +- be/src/runtime/exec-env.cc | 4 +- be/src/runtime/exec-env.h | 10 ++--- be/src/runtime/fragment-instance-state.cc | 4 +- be/src/runtime/hbase-table-factory.cc | 2 +- be/src/runtime/hbase-table.cc | 3 +- be/src/runtime/parallel-executor.cc | 2 +- be/src/runtime/tmp-file-mgr-test.cc | 9 ++-- be/src/runtime/tmp-file-mgr.cc | 3 +- be/src/scheduling/scheduler-test-util.cc | 3 +- be/src/scheduling/scheduler-test.cc | 38 ++++++++-------- be/src/service/client-request-state.cc | 8 ++-- be/src/service/fe-support.cc | 20 ++++++--- be/src/service/impala-beeswax-server.cc | 8 ++-- be/src/service/impala-hs2-server.cc | 14 +++--- be/src/service/impala-http-handler.cc | 9 ++-- be/src/service/impala-server.cc | 47 +++++++++++++------- be/src/service/impalad-main.cc | 3 +- be/src/service/query-options-test.cc | 3 +- be/src/statestore/statestore.cc | 10 +++-- be/src/statestore/statestore.h | 19 ++++---- be/src/statestore/statestored-main.cc | 6 ++- be/src/testutil/death-test-util.h | 8 ++-- be/src/testutil/fault-injection-util.cc | 2 + be/src/testutil/impalad-query-executor.cc | 4 +- be/src/testutil/in-process-servers.cc | 10 +++-- be/src/testutil/in-process-servers.h | 2 +- be/src/util/benchmark.cc | 1 + be/src/util/bit-util-test.cc | 6 ++- be/src/util/codec.h | 9 ++-- be/src/util/filesystem-util.h | 13 +++--- be/src/util/hdfs-util-test.cc | 3 +- be/src/util/jni-util.cc | 6 --- be/src/util/jni-util.h | 31 +++++++------ be/src/util/memory-metrics.h | 4 +- be/src/util/metrics-test.cc | 4 +- be/src/util/network-util.h | 4 +- be/src/util/parquet-reader.cc | 5 ++- be/src/util/runtime-profile.cc | 27 ++++++----- be/src/util/runtime-profile.h | 4 +- be/src/util/thread-pool.h | 2 +- be/src/util/thread.cc | 8 ++-- be/src/util/thread.h | 8 ++-- 82 files changed, 403 insertions(+), 309 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 63e5d43..31a3fc4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -144,7 +144,8 @@ endfunction() find_package(Boost REQUIRED COMPONENTS thread regex filesystem system date_time) -include_directories(${Boost_INCLUDE_DIRS}) +# Mark Boost as a system header to avoid compile warnings. +include_directories(SYSTEM ${Boost_INCLUDE_DIRS}) message(STATUS "Boost include dir: " ${Boost_INCLUDE_DIRS}) message(STATUS "Boost libraries: " ${Boost_LIBRARIES}) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 22c18b6..3ac4193 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -70,6 +70,16 @@ SET(CXX_CLANG_FLAGS "${CXX_CLANG_FLAGS} -Wno-mismatched-tags") # -g: Enable symbols for profiler tools # -Wno-unused-local-typedefs: Do not warn for local typedefs that are unused. SET(CXX_GCC_FLAGS "-g -Wno-unused-local-typedefs") +if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) + # We need to add additional arguments for GCC 7+. We go down this branch if building + # with a non-GCC compiler of version 7+, but in that case CXX_GCC_FLAGS is not used, + # so it is inconsequential. TODO: IMPALA-5490: make this non-conditional when we + # upgrade GCC. + # -faligned-new: new will automatically align types. Otherwise "new Counter()" in the + # Kudu util code produces a warning (see KUDU-2094). + # TODO: -faligned-new is part of C++17, remove flag when we bump language version. + SET(CXX_GCC_FLAGS "${CXX_GCC_FLAGS} -faligned-new") +endif() # compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .') # For CMAKE_BUILD_TYPE=Debug @@ -264,7 +274,8 @@ set(CLANG_INCLUDE_FLAGS "-I${GFLAGS_INCLUDE_DIR}" "-I${RAPIDJSON_INCLUDE_DIR}" "-I${AVRO_INCLUDE_DIR}" - "-I${BOOST_INCLUDEDIR}" + # Include Boost as a system directory to suppress warnings from headers. + "-isystem${BOOST_INCLUDEDIR}" # Required so that jni.h can be found during Clang compilation "-I${JAVA_INCLUDE_PATH}" "-I${JAVA_INCLUDE_PATH2}" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/benchmarks/bit-packing-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/bit-packing-benchmark.cc b/be/src/benchmarks/bit-packing-benchmark.cc index 6e80d83..955c0fb 100644 --- a/be/src/benchmarks/bit-packing-benchmark.cc +++ b/be/src/benchmarks/bit-packing-benchmark.cc @@ -261,6 +261,7 @@ #include <algorithm> #include <iostream> +#include <numeric> #include <vector> #include "gutil/strings/substitute.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/benchmarks/expr-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc index c55ee3d..093d2b1 100644 --- a/be/src/benchmarks/expr-benchmark.cc +++ b/be/src/benchmarks/expr-benchmark.cc @@ -65,8 +65,8 @@ using namespace impala; class Planner { public: Planner() { - frontend_.SetCatalogInitialized(); - exec_env_.InitForFeTests(); + ABORT_IF_ERROR(frontend_.SetCatalogInitialized()); + ABORT_IF_ERROR(exec_env_.InitForFeTests()); } Status GeneratePlan(const string& stmt, TExecRequest* result) { @@ -561,7 +561,7 @@ Benchmark* BenchmarkTimestampFunctions() { int main(int argc, char** argv) { impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); impala::InitFeSupport(false); - impala::LlvmCodeGen::InitializeLlvm(); + ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm()); // Dynamically construct at runtime as the planner initialization depends on // static objects being initialized in other compilation modules. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/benchmarks/hash-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/hash-benchmark.cc b/be/src/benchmarks/hash-benchmark.cc index 1b56b7b..dadad25 100644 --- a/be/src/benchmarks/hash-benchmark.cc +++ b/be/src/benchmarks/hash-benchmark.cc @@ -420,7 +420,7 @@ int main(int argc, char **argv) { cout << Benchmark::GetMachineInfo() << endl; impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); impala::InitFeSupport(); - LlvmCodeGen::InitializeLlvm(); + ABORT_IF_ERROR(LlvmCodeGen::InitializeLlvm()); const int NUM_ROWS = 1024; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/benchmarks/network-perf-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/network-perf-benchmark.cc b/be/src/benchmarks/network-perf-benchmark.cc index 6cebaaf..1a0de24 100644 --- a/be/src/benchmarks/network-perf-benchmark.cc +++ b/be/src/benchmarks/network-perf-benchmark.cc @@ -81,7 +81,7 @@ class TestServer : public NetworkTestServiceIf { } void Server(ThriftServer* server) { - server->Start(); + ABORT_IF_ERROR(server->Start()); server->Join(); } }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/benchmarks/row-batch-serialize-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc index ee531e6..699b91b 100644 --- a/be/src/benchmarks/row-batch-serialize-benchmark.cc +++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc @@ -289,7 +289,7 @@ class RowBatchSerializeBenchmark { SerializeArgs* args = reinterpret_cast<SerializeArgs*>(data); for (int iter = 0; iter < batch_size; ++iter) { TRowBatch trow_batch; - args->batch->Serialize(&trow_batch, args->full_dedup); + ABORT_IF_ERROR(args->batch->Serialize(&trow_batch, args->full_dedup)); } } @@ -338,19 +338,19 @@ class RowBatchSerializeBenchmark { RowBatch* no_dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, &tracker)); FillBatch(no_dup_batch, 12345, 1, -1); TRowBatch no_dup_tbatch; - no_dup_batch->Serialize(&no_dup_tbatch); + ABORT_IF_ERROR(no_dup_batch->Serialize(&no_dup_tbatch)); RowBatch* adjacent_dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, &tracker)); FillBatch(adjacent_dup_batch, 12345, 5, -1); TRowBatch adjacent_dup_tbatch; - adjacent_dup_batch->Serialize(&adjacent_dup_tbatch, false); + ABORT_IF_ERROR(adjacent_dup_batch->Serialize(&adjacent_dup_tbatch, false)); RowBatch* dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, &tracker)); // Non-adjacent duplicates. FillBatch(dup_batch, 12345, 1, NUM_ROWS / 5); TRowBatch dup_tbatch; - dup_batch->Serialize(&dup_tbatch, true); + ABORT_IF_ERROR(dup_batch->Serialize(&dup_tbatch, true)); int baseline; Benchmark ser_suite("serialize"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/catalog/catalog-server.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index bf2892f..0e0cca0 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -401,11 +401,11 @@ void CatalogServer::CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args // Get the object type and name from the topic entry key TCatalogObject request; - TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request); + Status status = TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request); // Get the object and dump its contents. TCatalogObject result; - Status status = catalog_->GetCatalogObject(request, &result); + if (status.ok()) status = catalog_->GetCatalogObject(request, &result); if (status.ok()) { Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator()); document->AddMember("thrift_string", debug_string, document->GetAllocator()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/catalog/catalog-util.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc index 57d21b3..de4f2fd 100644 --- a/be/src/catalog/catalog-util.cc +++ b/be/src/catalog/catalog-util.cc @@ -201,9 +201,9 @@ Status CompressCatalogObject(string* catalog_object) { const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(output_buffer.data())); ReadWriteUtil::PutInt(output_buffer_ptr, static_cast<uint32_t>(catalog_object->size())); output_buffer_ptr += sizeof(uint32_t); - compressor->ProcessBlock(true, catalog_object->size(), + RETURN_IF_ERROR(compressor->ProcessBlock(true, catalog_object->size(), reinterpret_cast<const uint8_t*>(catalog_object->data()), &compressed_data_len, - &output_buffer_ptr); + &output_buffer_ptr)); output_buffer.resize(compressed_data_len + sizeof(uint32_t)); *catalog_object = move(output_buffer); return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/catalog/catalogd-main.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc index fe681d0..a3a0edb 100644 --- a/be/src/catalog/catalogd-main.cc +++ b/be/src/catalog/catalogd-main.cc @@ -73,10 +73,10 @@ int CatalogdMain(int argc, char** argv) { LOG(INFO) << "Not starting webserver"; } - metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr); + ABORT_IF_ERROR(metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr)); ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), true, nullptr, nullptr)); StartMemoryMaintenanceThread(); - StartThreadInstrumentation(metrics.get(), webserver.get(), true); + ABORT_IF_ERROR(StartThreadInstrumentation(metrics.get(), webserver.get(), true)); InitRpcEventTracing(webserver.get()); metrics->AddProperty<string>("catalog.version", GetVersionString(true)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/codegen/codegen-symbol-emitter.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/codegen-symbol-emitter.cc b/be/src/codegen/codegen-symbol-emitter.cc index b2c64e2..02008c7 100644 --- a/be/src/codegen/codegen-symbol-emitter.cc +++ b/be/src/codegen/codegen-symbol-emitter.cc @@ -61,7 +61,7 @@ void CodegenSymbolEmitter::NotifyObjectEmitted(const ObjectFile &obj, if (asm_file.fail()) { // Log error and continue if we can't write the disassembly to a file. Note that // fstream operations don't throw exceptions by default unless configured to do so. - LOG(ERROR) << "Could not save disassembly to: " << asm_file; + LOG(ERROR) << "Could not save disassembly to: " << asm_path_; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/codegen/llvm-codegen-test.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc index 61b942f..f6e1a57 100644 --- a/be/src/codegen/llvm-codegen-test.cc +++ b/be/src/codegen/llvm-codegen-test.cc @@ -462,6 +462,6 @@ int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); impala::InitFeSupport(false); - impala::LlvmCodeGen::InitializeLlvm(); + ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm()); return RUN_ALL_TESTS(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/common/compiler-util.h ---------------------------------------------------------------------- diff --git a/be/src/common/compiler-util.h b/be/src/common/compiler-util.h index 5132eb0..592c08c 100644 --- a/be/src/common/compiler-util.h +++ b/be/src/common/compiler-util.h @@ -53,6 +53,26 @@ #define RESTRICT __restrict__ #endif +/// GCC 5+ and Clang 3.6+ support __has_cpp_attribute(). Always return false on compilers +/// that don't know about __has_cpp_attribute(). +#if !defined(__GNUC__) || __GNUC__ >= 5 +#define HAS_CPP_ATTRIBUTE(attr) __has_cpp_attribute(attr) +#else +#define HAS_CPP_ATTRIBUTE(attr) 0 +#endif + +// Use [[nodiscard]] specifier if supported by our compiler. +#if HAS_CPP_ATTRIBUTE(nodiscard) +#define NODISCARD [[nodiscard]] +#else +#define NODISCARD +#endif + +// Suppress warnings when ignoring the return value from a function annotated with +// WARN_UNUSED_RESULT. Based on ignore_result() in gutil/basictypes.h. +template<typename T> +inline void discard_result(const T&) {} + namespace impala { /// The size of an L1 cache line in bytes on x86-64. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/common/init.cc ---------------------------------------------------------------------- diff --git a/be/src/common/init.cc b/be/src/common/init.cc index d778523..9734610 100644 --- a/be/src/common/init.cc +++ b/be/src/common/init.cc @@ -184,7 +184,7 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm, CpuInfo::VerifyCpuRequirements(); // Set the default hostname. The user can override this with the hostname flag. - GetHostname(&FLAGS_hostname); + ABORT_IF_ERROR(GetHostname(&FLAGS_hostname)); google::SetVersionString(impala::GetBuildVersion()); google::ParseCommandLineFlags(&argc, &argv, true); @@ -225,7 +225,7 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm, LOG(INFO) << "Process ID: " << getpid(); // Required for the FE's Catalog - impala::LibCache::Init(); + ABORT_IF_ERROR(impala::LibCache::Init()); Status fs_cache_init_status = impala::HdfsFsCache::Init(); if (!fs_cache_init_status.ok()) CLEAN_EXIT_WITH_ERROR(fs_cache_init_status.GetDetail()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/common/status.h ---------------------------------------------------------------------- diff --git a/be/src/common/status.h b/be/src/common/status.h index 91ad907..cf7481b 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -80,7 +80,7 @@ namespace impala { /// TODO: macros: /// RETURN_IF_ERROR(status) << "msg" /// MAKE_ERROR() << "msg" -class Status { +class NODISCARD Status { public: typedef strings::internal::SubstituteArg ArgType; @@ -265,7 +265,7 @@ std::ostream& operator<<(std::ostream& os, const Status& status); /// some generally useful macros #define RETURN_IF_ERROR(stmt) \ do { \ - Status __status__ = (stmt); \ + ::impala::Status __status__ = (stmt); \ if (UNLIKELY(!__status__.ok())) return __status__; \ } while (false) @@ -276,7 +276,7 @@ std::ostream& operator<<(std::ostream& os, const Status& status); #define ABORT_IF_ERROR(stmt) \ do { \ - Status __status__ = (stmt); \ + ::impala::Status __status__ = (stmt); \ if (UNLIKELY(!__status__.ok())) { \ ABORT_WITH_ERROR(__status__.GetDetail()); \ } \ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/external-data-source-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc index df93893..7c810c6 100644 --- a/be/src/exec/external-data-source-executor.cc +++ b/be/src/exec/external-data-source-executor.cc @@ -204,7 +204,7 @@ Status ExternalDataSourceExecutor::Close(const TCloseParams& params, Status status = CallJniMethod(executor_, s.close_id_, params, result); JNIEnv* env = getJNIEnv(); - if (executor_ != NULL) status.MergeStatus(JniUtil::FreeGlobalRef(env, executor_)); + if (executor_ != NULL) env->DeleteGlobalRef(executor_); is_initialized_ = false; return status; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/hbase-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc index 3d74d81..e783731 100644 --- a/be/src/exec/hbase-scan-node.cc +++ b/be/src/exec/hbase-scan-node.cc @@ -239,7 +239,7 @@ Status HBaseScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eo ss << "hbase table: " << table_name_ << endl; void* key; int key_length; - hbase_scanner_->GetRowKey(env, &key, &key_length); + RETURN_IF_ERROR(hbase_scanner_->GetRowKey(env, &key, &key_length)); ss << "row key: " << string(reinterpret_cast<const char*>(key), key_length); state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str())); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/hbase-table-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-scanner.cc b/be/src/exec/hbase-table-scanner.cc index 6e0d1ac..cf13e75 100644 --- a/be/src/exec/hbase-table-scanner.cc +++ b/be/src/exec/hbase-table-scanner.cc @@ -430,7 +430,7 @@ Status HBaseTableScanner::HandleResultScannerTimeout(JNIEnv* env, bool* timeout) jbyteArray start_bytes = (jbyteArray) env->CallObjectMethod(cell, cell_get_row_array_); jbyteArray end_bytes; - CreateByteArray(env, scan_range.stop_key(), &end_bytes); + RETURN_IF_ERROR(CreateByteArray(env, scan_range.stop_key(), &end_bytes)); return InitScanRange(env, start_bytes, end_bytes); } @@ -438,9 +438,9 @@ Status HBaseTableScanner::InitScanRange(JNIEnv* env, const ScanRange& scan_range JniLocalFrame jni_frame; RETURN_IF_ERROR(jni_frame.push(env)); jbyteArray start_bytes; - CreateByteArray(env, scan_range.start_key(), &start_bytes); + RETURN_IF_ERROR(CreateByteArray(env, scan_range.start_key(), &start_bytes)); jbyteArray end_bytes; - CreateByteArray(env, scan_range.stop_key(), &end_bytes); + RETURN_IF_ERROR(CreateByteArray(env, scan_range.stop_key(), &end_bytes)); return InitScanRange(env, start_bytes, end_bytes); } @@ -457,7 +457,8 @@ Status HBaseTableScanner::InitScanRange(JNIEnv* env, jbyteArray start_bytes, if (resultscanner_ != NULL) { // resultscanner_.close(); env->CallObjectMethod(resultscanner_, resultscanner_close_id_); - RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, resultscanner_)); + RETURN_ERROR_IF_EXC(env); + env->DeleteGlobalRef(resultscanner_); resultscanner_ = NULL; } // resultscanner_ = htable_.getScanner(scan_); @@ -542,7 +543,7 @@ Status HBaseTableScanner::Next(JNIEnv* env, bool* has_next) { return Status::OK(); } - if (cells_ != NULL) RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, cells_)); + if (cells_ != NULL) env->DeleteGlobalRef(cells_); // cells_ = result.raw(); jobject local_cells = reinterpret_cast<jobjectArray>( env->CallObjectMethod(result, result_raw_cells_id_)); @@ -656,7 +657,7 @@ Status HBaseTableScanner::GetRowKey(JNIEnv* env, const SlotDescriptor* slot_desc void* key; int key_length; jobject cell = env->GetObjectArrayElement(cells_, 0); - GetRowKey(env, cell, &key, &key_length); + RETURN_IF_ERROR(GetRowKey(env, cell, &key, &key_length)); DCHECK_EQ(key_length, slot_desc->type().GetByteSize()); WriteTupleSlot(slot_desc, tuple, reinterpret_cast<char*>(key)); RETURN_ERROR_IF_EXC(env); @@ -700,7 +701,7 @@ Status HBaseTableScanner::GetCurrentValue(JNIEnv* env, const string& family, Status HBaseTableScanner::GetValue(JNIEnv* env, const string& family, const string& qualifier, void** value, int* value_length) { bool is_null; - GetCurrentValue(env, family, qualifier, value, value_length, &is_null); + RETURN_IF_ERROR(GetCurrentValue(env, family, qualifier, value, value_length, &is_null)); RETURN_ERROR_IF_EXC(env); if (is_null) { *value = NULL; @@ -716,7 +717,8 @@ Status HBaseTableScanner::GetValue(JNIEnv* env, const string& family, void* value; int value_length; bool is_null; - GetCurrentValue(env, family, qualifier, &value, &value_length, &is_null); + RETURN_IF_ERROR( + GetCurrentValue(env, family, qualifier, &value, &value_length, &is_null)); RETURN_ERROR_IF_EXC(env); if (is_null) { tuple->SetNull(slot_desc->null_indicator_offset()); @@ -755,15 +757,16 @@ void HBaseTableScanner::Close(JNIEnv* env) { << "(this does not necessarily indicate a problem)"; } else { // GetJniExceptionMsg will clear the exception status and log - JniUtil::GetJniExceptionMsg(env, true, + Status status = JniUtil::GetJniExceptionMsg(env, true, "Unknown error occurred while closing ResultScanner: "); + if (!status.ok()) LOG(WARNING) << "Error closing ResultScanner()"; } } - JniUtil::FreeGlobalRef(env, resultscanner_); + env->DeleteGlobalRef(resultscanner_); resultscanner_ = NULL; } - if (scan_ != NULL) JniUtil::FreeGlobalRef(env, scan_); - if (cells_ != NULL) JniUtil::FreeGlobalRef(env, cells_); + if (scan_ != NULL) env->DeleteGlobalRef(scan_); + if (cells_ != NULL) env->DeleteGlobalRef(cells_); // Close the HTable so that the connections are not kept around. if (htable_.get() != NULL) htable_->Close(state_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/hbase-table-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-scanner.h b/be/src/exec/hbase-table-scanner.h index 865dafc..0d9abe2 100644 --- a/be/src/exec/hbase-table-scanner.h +++ b/be/src/exec/hbase-table-scanner.h @@ -85,7 +85,7 @@ class HBaseTableScanner { /// JNI setup. Create global references to classes, /// and find method ids. - static Status Init(); + static Status Init() WARN_UNUSED_RESULT; /// HBase scan range; "" means unbounded class ScanRange { @@ -114,33 +114,34 @@ class HBaseTableScanner { /// If start_/stop_key is not empty, is used for the corresponding role in the scan. /// Note: scan_range_vector cannot be modified for the duration of the scan. Status StartScan(JNIEnv* env, const TupleDescriptor* tuple_desc, - const ScanRangeVector& scan_range_vector, - const std::vector<THBaseFilter>& filters); + const ScanRangeVector& scan_range_vector, + const std::vector<THBaseFilter>& filters) WARN_UNUSED_RESULT; /// Position cursor to next row. Sets has_next to true if more rows exist, false /// otherwise. /// Returns non-ok status if an error occurred. - Status Next(JNIEnv* env, bool* has_next); + Status Next(JNIEnv* env, bool* has_next) WARN_UNUSED_RESULT; /// Get the current HBase row key. - Status GetRowKey(JNIEnv* env, void** key, int* key_length); + Status GetRowKey(JNIEnv* env, void** key, int* key_length) WARN_UNUSED_RESULT; /// Write the current HBase row key into the tuple slot. /// This is used for retrieving binary encoded data directly into the tuple. - Status GetRowKey(JNIEnv* env, const SlotDescriptor* slot_desc, Tuple* tuple); + Status GetRowKey( + JNIEnv* env, const SlotDescriptor* slot_desc, Tuple* tuple) WARN_UNUSED_RESULT; /// Used to fetch HBase values in order of family/qualifier. /// Fetch the next value matching family and qualifier into value/value_length. /// If there is no match, value is set to NULL and value_length to 0. Status GetValue(JNIEnv* env, const std::string& family, const std::string& qualifier, - void** value, int* value_length); + void** value, int* value_length) WARN_UNUSED_RESULT; /// Used to fetch HBase values in order of family/qualifier. /// Fetch the next value matching family and qualifier into the tuple slot. /// If there is no match, the tuple slot is set to null. /// This is used for retrieving binary encoded data directly into the tuple. Status GetValue(JNIEnv* env, const std::string& family, const std::string& qualifier, - const SlotDescriptor* slot_desc, Tuple* tuple); + const SlotDescriptor* slot_desc, Tuple* tuple) WARN_UNUSED_RESULT; /// Close HTable and ResultScanner. void Close(JNIEnv* env); @@ -262,7 +263,7 @@ class HBaseTableScanner { /// is returned in the status. In HBase 2.0, ScannerTimeoutException no longer /// exists and the error message is returned in the status. /// 'timeout' is true if a ScannerTimeoutException was thrown, false otherwise. - Status HandleResultScannerTimeout(JNIEnv* env, bool* timeout); + Status HandleResultScannerTimeout(JNIEnv* env, bool* timeout) WARN_UNUSED_RESULT; /// Lexicographically compares s with the string in data having given length. /// Returns a value > 0 if s is greater, a value < 0 if s is smaller, @@ -270,39 +271,46 @@ class HBaseTableScanner { int CompareStrings(const std::string& s, void* data, int length); /// Turn strings into Java byte array. - Status CreateByteArray(JNIEnv* env, const std::string& s, jbyteArray* bytes); + Status CreateByteArray( + JNIEnv* env, const std::string& s, jbyteArray* bytes) WARN_UNUSED_RESULT; /// First time scanning the table, do some setup Status ScanSetup(JNIEnv* env, const TupleDescriptor* tuple_desc, - const std::vector<THBaseFilter>& filters); + const std::vector<THBaseFilter>& filters) WARN_UNUSED_RESULT; /// Initialize the scan to the given range - Status InitScanRange(JNIEnv* env, const ScanRange& scan_range); + Status InitScanRange(JNIEnv* env, const ScanRange& scan_range) WARN_UNUSED_RESULT; /// Initialize the scan range to the scan range specified by the start and end byte /// arrays - Status InitScanRange(JNIEnv* env, jbyteArray start_bytes, jbyteArray end_bytes); + Status InitScanRange( + JNIEnv* env, jbyteArray start_bytes, jbyteArray end_bytes) WARN_UNUSED_RESULT; /// Copies the row key of cell into value_pool_ and returns it via *data and *length. /// Returns error status if memory limit is exceeded. - inline Status GetRowKey(JNIEnv* env, jobject cell, void** data, int* length); + inline Status GetRowKey( + JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT; /// Copies the column family of cell into value_pool_ and returns it /// via *data and *length. Returns error status if memory limit is exceeded. - inline Status GetFamily(JNIEnv* env, jobject cell, void** data, int* length); + inline Status GetFamily( + JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT; /// Copies the column qualifier of cell into value_pool_ and returns it /// via *data and *length. Returns error status if memory limit is exceeded. - inline Status GetQualifier(JNIEnv* env, jobject cell, void** data, int* length); + inline Status GetQualifier( + JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT; /// Copies the value of cell into value_pool_ and returns it via *data and *length. /// Returns error status if memory limit is exceeded. - inline Status GetValue(JNIEnv* env, jobject cell, void** data, int* length); + inline Status GetValue( + JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT; /// Returns the current value of cells_[cell_index_] in *data and *length /// if its family/qualifier match the given family/qualifier. /// Otherwise, sets *is_null to true indicating a mismatch in family or qualifier. inline Status GetCurrentValue(JNIEnv* env, const std::string& family, - const std::string& qualifier, void** data, int* length, bool* is_null); + const std::string& qualifier, void** data, int* length, + bool* is_null) WARN_UNUSED_RESULT; /// Write to a tuple slot with the given hbase binary formatted data, which is in /// big endian. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/hbase-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-writer.cc b/be/src/exec/hbase-table-writer.cc index 14fe121..2723eb7 100644 --- a/be/src/exec/hbase-table-writer.cc +++ b/be/src/exec/hbase-table-writer.cc @@ -194,7 +194,7 @@ Status HBaseTableWriter::AppendRows(RowBatch* batch) { RETURN_IF_ERROR(table_->Put(put_list_)); } // Now clean put_list_. - RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, put_list_)); + env->DeleteGlobalRef(put_list_); put_list_ = NULL; return Status::OK(); } @@ -204,17 +204,12 @@ Status HBaseTableWriter::CleanUpJni() { if (env == NULL) return Status("Error getting JNIEnv."); if (put_list_ != NULL) { - RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, put_list_)); + env->DeleteGlobalRef(put_list_); put_list_ = NULL; } - for (jbyteArray ref: cf_arrays_) { - RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, ref)); - } - for (jbyteArray ref: qual_arrays_) { - RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, ref)); - } - + for (jbyteArray ref: cf_arrays_) env->DeleteGlobalRef(ref); + for (jbyteArray ref: qual_arrays_) env->DeleteGlobalRef(ref); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 1ab87ac..557d346 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -354,7 +354,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) auto fn = [this]() { this->ScannerThread(); }; scanner_threads_.AddThread( - new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn)); + make_unique<Thread>(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn)); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/kudu-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index 0845d9a..e192a86 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -164,7 +164,7 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) { auto fn = [this, token, name]() { this->RunScannerThread(name, token); }; VLOG_RPC << "Thread started: " << name; scanner_threads_.AddThread( - new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn)); + make_unique<Thread>(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn)); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/kudu-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h index 23e7033..06f5f96 100644 --- a/be/src/exec/kudu-table-sink.h +++ b/be/src/exec/kudu-table-sink.h @@ -82,7 +82,7 @@ class KuduTableSink : public DataSink { /// appropriate counters for ignored errors. // /// Returns a bad Status if there are non-ignorable errors. - Status CheckForErrors(RuntimeState* state); + Status CheckForErrors(RuntimeState* state) WARN_UNUSED_RESULT; /// Used to get the KuduTableDescriptor from the RuntimeState TableId table_id_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/kudu-util.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h index eb32bcd..4d9b77b 100644 --- a/be/src/exec/kudu-util.h +++ b/be/src/exec/kudu-util.h @@ -48,7 +48,7 @@ bool KuduClientIsSupported(); /// Returns OK if Kudu is available or an error status containing the reason Kudu is not /// available. Kudu may not be available if no Kudu client is available for the platform /// or if Kudu was disabled by the startup flag --disable_kudu. -Status CheckKuduAvailability(); +Status CheckKuduAvailability() WARN_UNUSED_RESULT; /// Convenience function for the bool equivalent of CheckKuduAvailability(). bool KuduIsAvailable(); @@ -56,7 +56,7 @@ bool KuduIsAvailable(); /// Creates a new KuduClient using the specified master adresses. If any error occurs, /// 'client' is not set and an error status is returned. Status CreateKuduClient(const std::vector<std::string>& master_addrs, - kudu::client::sp::shared_ptr<kudu::client::KuduClient>* client); + kudu::client::sp::shared_ptr<kudu::client::KuduClient>* client) WARN_UNUSED_RESULT; /// Returns a debug string for the KuduSchema. std::string KuduSchemaDebugString(const kudu::client::KuduSchema& schema); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/experiments/compression-test.cc ---------------------------------------------------------------------- diff --git a/be/src/experiments/compression-test.cc b/be/src/experiments/compression-test.cc index f6b87bb..48ed666 100644 --- a/be/src/experiments/compression-test.cc +++ b/be/src/experiments/compression-test.cc @@ -74,12 +74,13 @@ void TestCompression(int num, int min_len, int max_len, THdfsCompression::type c int64_t compressed_len = compressor->MaxOutputLen(offset); uint8_t* compressed_buffer = (uint8_t*)malloc(compressed_len); - compressor->ProcessBlock(true, offset, buffer, &compressed_len, &compressed_buffer); + ABORT_IF_ERROR( + compressor->ProcessBlock(true, offset, buffer, &compressed_len, &compressed_buffer)); int64_t sorted_compressed_len = compressor->MaxOutputLen(offset); uint8_t* sorted_compressed_buffer = (uint8_t*)malloc(sorted_compressed_len); - compressor->ProcessBlock(true, offset, sorted_buffer, &sorted_compressed_len, - &sorted_compressed_buffer); + ABORT_IF_ERROR(compressor->ProcessBlock(true, offset, sorted_buffer, + &sorted_compressed_len, &sorted_compressed_buffer)); cout << "NumStrings=" << num << " MinLen=" << min_len << " MaxLen=" << max_len << " Codec=" << codec << endl; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exprs/expr-codegen-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-codegen-test.cc b/be/src/exprs/expr-codegen-test.cc index 24b897e..9ea865a 100644 --- a/be/src/exprs/expr-codegen-test.cc +++ b/be/src/exprs/expr-codegen-test.cc @@ -20,6 +20,10 @@ #include "exprs/scalar-expr.h" #include "udf/udf.h" +#ifdef IR_COMPILE +#include "exprs/decimal-operators-ir.cc" +#endif + using namespace impala; using namespace impala_udf; @@ -31,10 +35,6 @@ struct FnAttr { int arg2_type_size; }; -#ifdef IR_COMPILE -#include "exprs/decimal-operators-ir.cc" -#endif - DecimalVal TestGetFnAttrs( FunctionContext* ctx, const DecimalVal& arg0, BooleanVal& arg1, StringVal& arg2) { FnAttr* state = reinterpret_cast<FnAttr*>( @@ -359,7 +359,7 @@ int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST); InitFeSupport(); - LlvmCodeGen::InitializeLlvm(); + ABORT_IF_ERROR(LlvmCodeGen::InitializeLlvm()); return RUN_ALL_TESTS(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exprs/expr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc index 1e3366f..e04abc5 100644 --- a/be/src/exprs/expr-test.cc +++ b/be/src/exprs/expr-test.cc @@ -3927,7 +3927,7 @@ TEST_F(ExprTest, SessionFunctions) { map<Session, map<Query, string>> results; for (Session session: {S1, S2}) { - executor_->Setup(); // Starts new session + ASSERT_OK(executor_->Setup()); // Starts new session results[session][Q1] = GetValue("current_session()", TYPE_STRING); results[session][Q2] = GetValue("current_sid()", TYPE_STRING); } @@ -7307,7 +7307,7 @@ int main(int argc, char **argv) { InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST); ABORT_IF_ERROR(TimezoneDatabase::Initialize()); InitFeSupport(false); - impala::LlvmCodeGen::InitializeLlvm(); + ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm()); // Disable llvm optimization passes if the env var is no set to true. Running without // the optimizations makes the tests run much faster. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exprs/hive-udf-call.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc index 3e071bf..a3e0fe6 100644 --- a/be/src/exprs/hive-udf-call.cc +++ b/be/src/exprs/hive-udf-call.cc @@ -246,8 +246,7 @@ void HiveUdfCall::CloseEvaluator(FunctionContext::FunctionStateScope scope, if (jni_ctx->executor != NULL) { env->CallNonvirtualVoidMethodA( jni_ctx->executor, executor_cl_, executor_close_id_, NULL); - Status status = JniUtil::FreeGlobalRef(env, jni_ctx->executor); - if (!status.ok()) LOG(ERROR) << status.GetDetail(); + env->DeleteGlobalRef(jni_ctx->executor); } if (jni_ctx->input_values_buffer != NULL) { delete[] jni_ctx->input_values_buffer; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/rpc/auth-provider.h ---------------------------------------------------------------------- diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h index 2b5b747..0021dc7 100644 --- a/be/src/rpc/auth-provider.h +++ b/be/src/rpc/auth-provider.h @@ -39,19 +39,21 @@ class Thread; class AuthProvider { public: /// Initialises any state required to perform authentication using this provider. - virtual Status Start() = 0; + virtual Status Start() WARN_UNUSED_RESULT = 0; /// Creates a new Thrift transport factory in the out parameter that performs /// authorisation per this provider's protocol. virtual Status GetServerTransportFactory( - boost::shared_ptr<apache::thrift::transport::TTransportFactory>* factory) = 0; + boost::shared_ptr<apache::thrift::transport::TTransportFactory>* factory) + WARN_UNUSED_RESULT = 0; - /// Called by Thrift clients to wrap a raw transport with any intermediate transport that - /// an auth protocol requires. + /// Called by Thrift clients to wrap a raw transport with any intermediate transport + /// that an auth protocol requires. virtual Status WrapClientTransport(const std::string& hostname, boost::shared_ptr<apache::thrift::transport::TTransport> raw_transport, const std::string& service_name, - boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport) = 0; + boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport) + WARN_UNUSED_RESULT = 0; /// Returns true if this provider uses Sasl at the transport layer. virtual bool is_sasl() = 0; @@ -151,7 +153,7 @@ class SaslAuthProvider : public AuthProvider { void RunKinit(Promise<Status>* first_kinit); /// One-time kerberos-specific environment variable setup. Called by InitKerberos(). - Status InitKerberosEnv(); + Status InitKerberosEnv() WARN_UNUSED_RESULT; }; /// This provider implements no authentication, so any connection is immediately http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/rpc/thrift-client.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/thrift-client.cc b/be/src/rpc/thrift-client.cc index a64cdd3..f8da136 100644 --- a/be/src/rpc/thrift-client.cc +++ b/be/src/rpc/thrift-client.cc @@ -42,16 +42,16 @@ ThriftClientImpl::ThriftClientImpl(const std::string& ipaddress, int port, bool : address_(MakeNetworkAddress(ipaddress, port)), ssl_(ssl) { if (ssl_) { SSLProtocol version; - socket_create_status_ = + init_status_ = SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &version); - if (!socket_create_status_.ok()) return; + if (!init_status_.ok()) return; ssl_factory_.reset(new TSSLSocketFactory(version)); } - socket_create_status_ = CreateSocket(); + init_status_ = CreateSocket(); } Status ThriftClientImpl::Open() { - if (!socket_create_status_.ok()) return socket_create_status_; + RETURN_IF_ERROR(init_status_); try { if (!transport_->isOpen()) { transport_->open(); @@ -71,7 +71,7 @@ Status ThriftClientImpl::Open() { Status ThriftClientImpl::OpenWithRetry(uint32_t num_tries, uint64_t wait_ms) { // Socket creation failures are not recoverable. - if (!socket_create_status_.ok()) return socket_create_status_; + RETURN_IF_ERROR(init_status_); uint32_t try_count = 0L; while (true) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/rpc/thrift-client.h ---------------------------------------------------------------------- diff --git a/be/src/rpc/thrift-client.h b/be/src/rpc/thrift-client.h index 778075c..8194016 100644 --- a/be/src/rpc/thrift-client.h +++ b/be/src/rpc/thrift-client.h @@ -68,7 +68,7 @@ class ThriftClientImpl { /// Set send timeout on the underlying TSocket. void setSendTimeout(int32_t ms) { socket_->setSendTimeout(ms); } - Status socket_create_status() { return socket_create_status_; } + Status init_status() { return init_status_; } protected: ThriftClientImpl(const std::string& ipaddress, int port, bool ssl); @@ -83,7 +83,7 @@ class ThriftClientImpl { /// True if ssl encryption is enabled on this connection. bool ssl_; - Status socket_create_status_; + Status init_status_; /// Sasl Client object. Contains client kerberos identification data. /// Will be NULL if kerberos is not being used. @@ -145,15 +145,16 @@ ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port // not use the client after that. // TODO: Move initialization code that can fail into a separate Init() method. if (socket_ == NULL) { - DCHECK(!socket_create_status_.ok()); + DCHECK(!init_status_.ok()); return; } // transport_ is created by wrapping the socket_ in the TTransport provided by the // auth_provider_ and then a TBufferedTransport (IMPALA-1928). transport_ = socket_; - auth_provider_->WrapClientTransport(address_.hostname, transport_, service_name, - &transport_); + init_status_ = auth_provider_->WrapClientTransport(address_.hostname, transport_, + service_name, &transport_); + if (!init_status_.ok()) return; // The caller will decide what to do with the Status. ThriftServer::BufferedTransportFactory factory; transport_ = factory.getTransport(transport_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/buffered-tuple-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc index 9fe0618..0b89498 100644 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ b/be/src/runtime/buffered-tuple-stream-test.cc @@ -1317,7 +1317,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) { cv, *item_desc, mem_pool_.get(), runtime_state_, array_len); Tuple* array_data; int num_rows; - builder.GetFreeMemory(&array_data, &num_rows); + ASSERT_OK(builder.GetFreeMemory(&array_data, &num_rows)); expected_row_size += item_desc->byte_size() * array_len; // Fill the array with pointers to our constant strings. @@ -1423,7 +1423,7 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) { cv, *item_desc, mem_pool_.get(), runtime_state_, array_len); Tuple* array_data; int num_rows; - builder.GetFreeMemory(&array_data, &num_rows); + ASSERT_OK(builder.GetFreeMemory(&array_data, &num_rows)); expected_row_size += item_desc->byte_size() * array_len; // Fill the array with pointers to our constant strings. @@ -1457,6 +1457,6 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); impala::InitFeSupport(); - impala::LlvmCodeGen::InitializeLlvm(); + ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm()); return RUN_ALL_TESTS(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index 14446e7..2eff955 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -89,7 +89,7 @@ class BufferPoolTest : public ::testing::Test { for (string created_tmp_dir : created_tmp_dirs_) { chmod((created_tmp_dir + SCRATCH_SUFFIX).c_str(), S_IRWXU); } - FileSystemUtil::RemovePaths(created_tmp_dirs_); + ASSERT_OK(FileSystemUtil::RemovePaths(created_tmp_dirs_)); created_tmp_dirs_.clear(); CpuTestUtil::ResetAffinity(); // Some tests modify affinity. } @@ -806,14 +806,15 @@ TEST_F(BufferPoolTest, PinWithoutReservation) { TEST_BUFFER_LEN, NewProfile(), &client)); BufferPool::PageHandle handle; - IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), ""); + IMPALA_ASSERT_DEBUG_DEATH( + discard_result(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)), ""); // Should succeed after increasing reservation. ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN)); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)); // But we can't pin again. - IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), ""); + IMPALA_ASSERT_DEBUG_DEATH(discard_result(pool.Pin(&client, &handle)), ""); pool.DestroyPage(&client, &handle); pool.DeregisterClient(&client); @@ -866,7 +867,8 @@ TEST_F(BufferPoolTest, ExtractBuffer) { // Test that ExtractBuffer() DCHECKs for unpinned pages. ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); pool.Unpin(&client, &page); - IMPALA_ASSERT_DEBUG_DEATH((void)pool.ExtractBuffer(&client, &page, &buffer), ""); + IMPALA_ASSERT_DEBUG_DEATH( + discard_result(pool.ExtractBuffer(&client, &page, &buffer)), ""); pool.DestroyPage(&client, &page); pool.DeregisterClient(&client); @@ -955,7 +957,8 @@ TEST_F(BufferPoolTest, EvictPageSameClient) { ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); // Do not have enough reservations because we pinned the page. - IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2), ""); + IMPALA_ASSERT_DEBUG_DEATH( + discard_result(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)), ""); // We should be able to create a new page after unpinned and evicting the first one. pool.Unpin(&client, &handle1); @@ -1319,7 +1322,7 @@ void BufferPoolTest::TestQueryTeardown(bool write_error) { string tmp_file_path = TmpFilePath(pages.data()); FreeBuffers(pool, &client, &tmp_buffers); - PinAll(pool, &client, &pages); + ASSERT_OK(PinAll(pool, &client, &pages)); // Remove temporary file to force future writes to that file to fail. DisableBackingFile(tmp_file_path); } @@ -1367,7 +1370,7 @@ void BufferPoolTest::TestWriteError(int write_delay_ms) { UnpinAll(&pool, &client, &pages); WaitForAllWrites(&client); // Repin the pages - PinAll(&pool, &client, &pages); + ASSERT_OK(PinAll(&pool, &client, &pages)); // Remove permissions to the backing storage so that future writes will fail ASSERT_GT(RemoveScratchPerms(), 0); // Give the first write a chance to fail before the second write starts. @@ -1480,7 +1483,9 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) { PageHandle* error_page = FindPageInDir(pages[ERROR_QUERY], error_dir); ASSERT_TRUE(error_page != NULL) << "Expected a tmp file in dir " << error_dir; const string& error_file_path = TmpFilePath(error_page); - for (int i = 0; i < INITIAL_QUERIES; ++i) PinAll(&pool, &clients[i], &pages[i]); + for (int i = 0; i < INITIAL_QUERIES; ++i) { + ASSERT_OK(PinAll(&pool, &clients[i], &pages[i])); + } DisableBackingFile(error_file_path); for (int i = 0; i < INITIAL_QUERIES; ++i) UnpinAll(&pool, &clients[i], &pages[i]); @@ -1489,7 +1494,7 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) { // Both clients should still be usable - test the API. for (int i = 0; i < INITIAL_QUERIES; ++i) { - PinAll(&pool, &clients[i], &pages[i]); + ASSERT_OK(PinAll(&pool, &clients[i], &pages[i])); VerifyData(pages[i], 0); UnpinAll(&pool, &clients[i], &pages[i]); ASSERT_OK(AllocateAndFree(&pool, &clients[i], TEST_BUFFER_LEN)); @@ -1521,7 +1526,7 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) { } DestroyAll(&pool, &clients[ERROR_QUERY], &error_new_pages); - PinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]); + ASSERT_OK(PinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY])); UnpinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]); WaitForAllWrites(&clients[NO_ERROR_QUERY]); EXPECT_TRUE(FindPageInDir(pages[NO_ERROR_QUERY], good_dir) != NULL); @@ -1929,7 +1934,7 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); impala::InitFeSupport(); - impala::LlvmCodeGen::InitializeLlvm(); + ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm()); int result = 0; for (bool encryption : {false, true}) { for (bool numa : {false, true}) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/client-cache.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc index 4f403ad..8c0b6aa 100644 --- a/be/src/runtime/client-cache.cc +++ b/be/src/runtime/client-cache.cc @@ -111,9 +111,9 @@ Status ClientCacheHelper::CreateClient(const TNetworkAddress& address, shared_ptr<ThriftClientImpl> client_impl(factory_method(address, client_key)); VLOG(2) << "CreateClient(): creating new client for " << client_impl->address(); - if (!client_impl->socket_create_status().ok()) { + if (!client_impl->init_status().ok()) { *client_key = nullptr; - return client_impl->socket_create_status(); + return client_impl->init_status(); } // Set the TSocket's send and receive timeouts. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/client-cache.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h index 53bbabe..d0fd30e 100644 --- a/be/src/runtime/client-cache.h +++ b/be/src/runtime/client-cache.h @@ -80,14 +80,15 @@ class ClientCacheHelper { // /// If there is an error creating the new client, *client_key will be NULL. Status GetClient(const TNetworkAddress& address, ClientFactory factory_method, - ClientKey* client_key); + ClientKey* client_key) WARN_UNUSED_RESULT; /// Returns a newly-opened client in client_key. May reopen the existing client, or may /// replace it with a new one (created using 'factory_method'). // /// Returns an error status and sets 'client_key' to NULL if a new client cannot /// created. - Status ReopenClient(ClientFactory factory_method, ClientKey* client_key); + Status ReopenClient( + ClientFactory factory_method, ClientKey* client_key) WARN_UNUSED_RESULT; /// Returns a client to the cache. Upon return, *client_key will be NULL, and the /// associated client will be available in the per-host cache. @@ -190,7 +191,7 @@ class ClientCacheHelper { /// Create a new client for specific address in 'client' and put it in client_map_ Status CreateClient(const TNetworkAddress& address, ClientFactory factory_method, - ClientKey* client_key); + ClientKey* client_key) WARN_UNUSED_RESULT; }; /// A scoped client connection to help manage clients from a client cache. Clients of this @@ -216,9 +217,7 @@ class ClientConnection { } } - Status Reopen() { - return client_cache_->ReopenClient(&client_); - } + Status Reopen() WARN_UNUSED_RESULT { return client_cache_->ReopenClient(&client_); } T* operator->() const { return client_; } @@ -393,18 +392,18 @@ class ClientCache { /// Obtains a pointer to a Thrift interface object (of type T), /// backed by a live transport which is already open. Returns /// Status::OK unless there was an error opening the transport. - Status GetClient(const TNetworkAddress& address, T** iface) { - return client_cache_helper_.GetClient(address, client_factory_, - reinterpret_cast<ClientKey*>(iface)); + Status GetClient(const TNetworkAddress& address, T** iface) WARN_UNUSED_RESULT { + return client_cache_helper_.GetClient( + address, client_factory_, reinterpret_cast<ClientKey*>(iface)); } /// Close and delete the underlying transport. Return a new client connecting to the /// same host/port. /// Returns an error status if a new connection cannot be established and *client will /// be unaffected in that case. - Status ReopenClient(T** client) { - return client_cache_helper_.ReopenClient(client_factory_, - reinterpret_cast<ClientKey*>(client)); + Status ReopenClient(T** client) WARN_UNUSED_RESULT { + return client_cache_helper_.ReopenClient( + client_factory_, reinterpret_cast<ClientKey*>(client)); } /// Return the client to the cache and set *client to NULL. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/collection-value-builder.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/collection-value-builder.h b/be/src/runtime/collection-value-builder.h index 1811a9a..1d20ef4 100644 --- a/be/src/runtime/collection-value-builder.h +++ b/be/src/runtime/collection-value-builder.h @@ -49,7 +49,7 @@ class CollectionValueBuilder { /// of tuples that may be written before calling CommitTuples() in 'num_tuples'. After /// calling CommitTuples(), GetMemory() can be called again. Allocates if there is no /// free tuple memory left. Returns error status if memory limit is exceeded. - Status GetFreeMemory(Tuple** tuple_mem, int* num_tuples) { + Status GetFreeMemory(Tuple** tuple_mem, int* num_tuples) WARN_UNUSED_RESULT { if (tuple_desc_.byte_size() == 0) { // No tuple memory necessary, so caller can write as many tuples as 'num_tuples' // field can count. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/coordinator-backend-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index 195880f..515b0b3 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -388,7 +388,10 @@ void Coordinator::BackendState::PublishFilter( TPublishFilterParams local_params(*rpc_params); local_params.__set_bloom_filter(rpc_params->bloom_filter); TPublishFilterResult res; - backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, local_params, &res); + status = backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, local_params, &res); + if (!status.ok()) { + LOG(WARNING) << "Error publishing filter, continuing..." << status.GetDetail(); + } // TODO: switch back to the following once we fix the lifecycle // problems of Coordinator //std::cref(fragment_inst->impalad_address()), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/data-stream-recvr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc index 0c6d98e..35076f5 100644 --- a/be/src/runtime/data-stream-recvr.cc +++ b/be/src/runtime/data-stream-recvr.cc @@ -344,8 +344,10 @@ void DataStreamRecvr::CancelStream() { void DataStreamRecvr::Close() { // Remove this receiver from the DataStreamMgr that created it. - // TODO: log error msg - mgr_->DeregisterRecvr(fragment_instance_id(), dest_node_id()); + const Status status = mgr_->DeregisterRecvr(fragment_instance_id(), dest_node_id()); + if (!status.ok()) { + LOG(WARNING) << "Error deregistering receiver: " << status.GetDetail(); + } mgr_ = NULL; for (int i = 0; i < sender_queues_.size(); ++i) { sender_queues_[i]->Close(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/data-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index 93862fd..5ea6756 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -113,7 +113,7 @@ class DataStreamTest : public testing::Test { protected: DataStreamTest() : next_val_(0) { // Initialize MemTrackers and RuntimeState for use by the data stream receiver. - exec_env_.InitForFeTests(); + ABORT_IF_ERROR(exec_env_.InitForFeTests()); runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_)); mem_pool_.reset(new MemPool(&tracker_)); @@ -307,7 +307,7 @@ class DataStreamTest : public testing::Test { ordering_exprs_.push_back(lhs_slot); less_than_ = obj_pool_.Add(new TupleRowComparator(ordering_exprs_, is_asc_, nulls_first_)); - less_than_->Open(&obj_pool_, runtime_state_.get(), mem_pool_.get()); + ASSERT_OK(less_than_->Open(&obj_pool_, runtime_state_.get(), mem_pool_.get())); } // Create batch_, but don't fill it with data yet. Assumes we created row_desc_. @@ -459,8 +459,9 @@ class DataStreamTest : public testing::Test { boost::shared_ptr<ImpalaTestBackend> handler( new ImpalaTestBackend(dynamic_cast<DataStreamMgr*>(stream_mgr_))); boost::shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler)); - ThriftServerBuilder("DataStreamTest backend", processor, FLAGS_port).Build(&server_); - server_->Start(); + ThriftServerBuilder builder("DataStreamTest backend", processor, FLAGS_port); + ASSERT_OK(builder.Build(&server_)); + ASSERT_OK(server_->Start()); } void StopBackend() { @@ -514,7 +515,7 @@ class DataStreamTest : public testing::Test { if (!info.status.ok()) break; } VLOG_QUERY << "closing sender" << sender_num; - sender.FlushFinal(&state); + info.status.MergeStatus(sender.FlushFinal(&state)); sender.Close(&state); info.num_bytes_sent = sender.GetNumDataBytesSent(); @@ -656,6 +657,6 @@ int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST); InitFeSupport(); - impala::LlvmCodeGen::InitializeLlvm(); + ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm()); return RUN_ALL_TESTS(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 21edbb2..0fdfb77 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -391,7 +391,7 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { for (int j = 0; j < num_threads_per_disk; ++j) { stringstream ss; ss << "work-loop(Disk: " << i << ", Thread: " << j << ")"; - disk_thread_group_.AddThread(new Thread("disk-io-mgr", ss.str(), + disk_thread_group_.AddThread(make_unique<Thread>("disk-io-mgr", ss.str(), &DiskIoMgr::WorkLoop, this, disk_queues_[i])); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 4551232..7f62c96 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -265,7 +265,7 @@ Status ExecEnv::StartServices() { buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, FLAGS_min_buffer_size); InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit); - metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr); + RETURN_IF_ERROR(metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr)); impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends"); catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server"); RETURN_IF_ERROR(RegisterMemoryMetrics( @@ -326,7 +326,7 @@ Status ExecEnv::StartServices() { TGetHadoopConfigRequest config_request; config_request.__set_name(DEFAULT_FS); TGetHadoopConfigResponse config_response; - frontend_->GetHadoopConfig(config_request, &config_response); + RETURN_IF_ERROR(frontend_->GetHadoopConfig(config_request, &config_response)); if (config_response.__isset.value) { default_fs_ = config_response.value; } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index 6293988..5d7a3d0 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -81,7 +81,7 @@ class ExecEnv { ~ExecEnv(); /// Starts any dependent services in their correct order - Status StartServices(); + Status StartServices() WARN_UNUSED_RESULT; /// TODO: Should ExecEnv own the ImpalaServer as well? void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; } @@ -127,7 +127,7 @@ class ExecEnv { const TNetworkAddress& backend_address() const { return backend_address_; } /// Initializes the exec env for running FE tests. - Status InitForFeTests(); + Status InitForFeTests() WARN_UNUSED_RESULT; /// Returns true if this environment was created from the FE tests. This makes the /// environment special since the JVM is started first and libraries are loaded @@ -135,14 +135,14 @@ class ExecEnv { bool is_fe_tests() { return is_fe_tests_; } /// Returns the configured defaultFs set in core-site.xml - string default_fs() { return default_fs_; } + const string& default_fs() { return default_fs_; } /// Gets a KuduClient for this list of master addresses. It will look up and share /// an existing KuduClient if possible. Otherwise, it will create a new KuduClient /// internally and return a pointer to it. All KuduClients accessed through this /// interface are owned by the ExecEnv. Thread safe. - Status GetKuduClient( - const std::vector<std::string>& master_addrs, kudu::client::KuduClient** client); + Status GetKuduClient(const std::vector<std::string>& master_addrs, + kudu::client::KuduClient** client) WARN_UNUSED_RESULT; private: boost::scoped_ptr<ObjectPool> obj_pool_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/fragment-instance-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc index 99de62d..3f5a72f 100644 --- a/be/src/runtime/fragment-instance-state.cc +++ b/be/src/runtime/fragment-instance-state.cc @@ -97,7 +97,9 @@ done: } void FragmentInstanceState::Cancel() { - WaitForPrepare(); // make sure Prepare() finished + // Make sure Prepare() finished. We don't care about the status since the query is + // being cancelled. + discard_result(WaitForPrepare()); // Ensure that the sink is closed from both sides. Although in ordinary executions we // rely on the consumer to do this, in error cases the consumer may not be able to send http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/hbase-table-factory.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/hbase-table-factory.cc b/be/src/runtime/hbase-table-factory.cc index d0c97d8..7340473 100644 --- a/be/src/runtime/hbase-table-factory.cc +++ b/be/src/runtime/hbase-table-factory.cc @@ -92,7 +92,7 @@ HBaseTableFactory::~HBaseTableFactory() { lock_guard<mutex> lock(connection_lock_); if (connection_ != NULL) { env->CallObjectMethod(connection_, connection_close_id_); - JniUtil::FreeGlobalRef(env, connection_); + env->DeleteGlobalRef(connection_); connection_ = NULL; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/hbase-table.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/hbase-table.cc b/be/src/runtime/hbase-table.cc index 14f402f..84d03eb 100644 --- a/be/src/runtime/hbase-table.cc +++ b/be/src/runtime/hbase-table.cc @@ -59,8 +59,7 @@ void HBaseTable::Close(RuntimeState* state) { env->CallObjectMethod(table_, table_close_id_); Status s = JniUtil::GetJniExceptionMsg(env, true, "HBaseTable::Close(): "); if (!s.ok()) state->LogError(s.msg()); - s = JniUtil::FreeGlobalRef(env, table_); - if (!s.ok()) state->LogError(s.msg()); + env->DeleteGlobalRef(table_); } table_ = NULL; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/parallel-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/parallel-executor.cc b/be/src/runtime/parallel-executor.cc index 5b5d0e0..b7b3cc2 100644 --- a/be/src/runtime/parallel-executor.cc +++ b/be/src/runtime/parallel-executor.cc @@ -35,7 +35,7 @@ Status ParallelExecutor::Exec(Function function, void** args, int num_args, for (int i = 0; i < num_args; ++i) { stringstream ss; ss << "worker-thread(" << i << ")"; - worker_threads.AddThread(new Thread("parallel-executor", ss.str(), + worker_threads.AddThread(make_unique<Thread>("parallel-executor", ss.str(), &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies)); } worker_threads.JoinAll(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 343ec93..c610529 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -17,6 +17,7 @@ #include <cstdio> #include <cstdlib> +#include <numeric> #include <boost/filesystem.hpp> #include <boost/scoped_ptr.hpp> @@ -239,7 +240,7 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) { TmpFileMgr::File* file = files[0]; // Check the prefix is the expected temporary directory. EXPECT_EQ(0, file->path().find(tmp_dirs[0])); - FileSystemUtil::RemovePaths(tmp_dirs); + ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs)); file_group.Close(); CheckMetrics(&tmp_file_mgr); } @@ -266,7 +267,7 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) { // Check the prefix is the expected temporary directory. EXPECT_EQ(0, files[i]->path().find(tmp_dirs[i])); } - FileSystemUtil::RemovePaths(tmp_dirs); + ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs)); file_group.Close(); CheckMetrics(&tmp_file_mgr); } @@ -312,7 +313,7 @@ TEST_F(TmpFileMgrTest, TestReportError) { // Attempts to allocate new files on bad device should succeed. unique_ptr<TmpFileMgr::File> bad_file2; ASSERT_OK(NewFile(&tmp_file_mgr, &file_group, bad_device, &bad_file2)); - FileSystemUtil::RemovePaths(tmp_dirs); + ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs)); file_group.Close(); CheckMetrics(&tmp_file_mgr); } @@ -343,7 +344,7 @@ TEST_F(TmpFileMgrTest, TestAllocateNonWritable) { ASSERT_OK(FileAllocateSpace(allocated_files[1], 1, &offset)); chmod(scratch_subdirs[0].c_str(), S_IRWXU); - FileSystemUtil::RemovePaths(tmp_dirs); + ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs)); file_group.Close(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index c99077f..1cf86b4 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -212,8 +212,7 @@ void TmpFileMgr::File::Blacklist(const ErrorMsg& msg) { Status TmpFileMgr::File::Remove() { // Remove the file if present (it may not be present if no writes completed). - FileSystemUtil::RemovePaths({path_}); - return Status::OK(); + return FileSystemUtil::RemovePaths({path_}); } string TmpFileMgr::File::DebugString() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/scheduling/scheduler-test-util.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc index 1f233d2..b9b2c6e 100644 --- a/be/src/scheduling/scheduler-test-util.cc +++ b/be/src/scheduling/scheduler-test-util.cc @@ -509,7 +509,8 @@ void SchedulerWrapper::InitializeScheduler() { scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id, scheduler_backend_address, &metrics_, nullptr, nullptr)); - scheduler_->Init(); + const Status status = scheduler_->Init(); + DCHECK(status.ok()) << "Scheduler init failed in test"; // Initialize the scheduler backend maps. SendFullMembershipMap(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/scheduling/scheduler-test.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc index c7b284b..c70e54f 100644 --- a/be/src/scheduling/scheduler-test.cc +++ b/be/src/scheduling/scheduler-test.cc @@ -43,7 +43,7 @@ TEST_F(SchedulerTest, SingleHostSingleFile) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(1, result.NumTotalAssignments()); EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes()); @@ -67,7 +67,7 @@ TEST_F(SchedulerTest, SingleCoordinatorNoExecutor) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(2, result.NumDistinctBackends()); EXPECT_EQ(0, result.NumDiskAssignments(0)); @@ -87,7 +87,7 @@ TEST_F(SchedulerTest, ExecAtCoord) { Result result(plan); SchedulerWrapper scheduler(plan); bool exec_at_coord = true; - scheduler.Compute(exec_at_coord, &result); + ASSERT_OK(scheduler.Compute(exec_at_coord, &result)); EXPECT_EQ(3 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0)); EXPECT_EQ(0, result.NumTotalAssignedBytes(1)); @@ -108,7 +108,7 @@ TEST_F(SchedulerTest, ScanTableTwice) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(4 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes()); EXPECT_EQ(4 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes()); @@ -130,7 +130,7 @@ TEST_F(SchedulerTest, RandomReads) { Result result(plan); SchedulerWrapper scheduler(plan); - for (int i = 0; i < 100; ++i) scheduler.Compute(&result); + for (int i = 0; i < 100; ++i) ASSERT_OK(scheduler.Compute(&result)); ASSERT_EQ(100, result.NumAssignments()); EXPECT_EQ(100, result.NumTotalAssignments()); @@ -154,7 +154,7 @@ TEST_F(SchedulerTest, LocalReadsPickFirstReplica) { Result result(plan); SchedulerWrapper scheduler(plan); - for (int i = 0; i < 3; ++i) scheduler.Compute(&result); + for (int i = 0; i < 3; ++i) ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(3, result.NumTotalAssignments()); EXPECT_EQ(3, result.NumDiskAssignments(0)); @@ -179,7 +179,7 @@ TEST_F(SchedulerTest, TestMediumSizedCluster) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(16, result.NumTotalAssignments()); EXPECT_EQ(16, result.NumDiskAssignments()); @@ -198,7 +198,7 @@ TEST_F(SchedulerTest, RemoteOnlyPlacement) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(10, result.NumTotalAssignments()); EXPECT_EQ(10, result.NumRemoteAssignments()); @@ -219,7 +219,7 @@ TEST_F(SchedulerTest, ManyScanRanges) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(1000, result.NumTotalAssignments()); EXPECT_EQ(1000, result.NumDiskAssignments()); @@ -245,7 +245,7 @@ TEST_F(SchedulerTest, DisjointClusterWithRemoteReads) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(10, result.NumTotalAssignments()); EXPECT_EQ(10, result.NumRemoteAssignments()); @@ -267,14 +267,14 @@ TEST_F(SchedulerTest, TestCachedReadPreferred) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes()); EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes(1)); EXPECT_EQ(0, result.NumDiskAssignedBytes()); EXPECT_EQ(0, result.NumRemoteAssignedBytes()); // Compute additional assignments. - for (int i = 0; i < 8; ++i) scheduler.Compute(&result); + for (int i = 0; i < 8; ++i) ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes()); EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes(1)); EXPECT_EQ(0, result.NumDiskAssignedBytes()); @@ -296,13 +296,13 @@ TEST_F(SchedulerTest, TestDisableCachedReads) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(0, result.NumCachedAssignedBytes()); EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes()); EXPECT_EQ(0, result.NumRemoteAssignedBytes()); // Compute additional assignments. - for (int i = 0; i < 8; ++i) scheduler.Compute(&result); + for (int i = 0; i < 8; ++i) ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(0, result.NumCachedAssignedBytes()); EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes()); EXPECT_EQ(0, result.NumRemoteAssignedBytes()); @@ -326,7 +326,7 @@ TEST_F(SchedulerTest, EmptyStatestoreMessage) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(0, result.NumTotalAssignedBytes(0)); EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1)); EXPECT_EQ(0, result.NumTotalAssignedBytes(2)); @@ -335,7 +335,7 @@ TEST_F(SchedulerTest, EmptyStatestoreMessage) { result.Reset(); scheduler.SendEmptyUpdate(); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0)); EXPECT_EQ(0, result.NumTotalAssignedBytes(1)); EXPECT_EQ(0, result.NumTotalAssignedBytes(2)); @@ -359,7 +359,7 @@ TEST_F(SchedulerTest, TestSendUpdates) { Result result(plan); SchedulerWrapper scheduler(plan); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); // Two backends are registered, so the scheduler will pick a random one. EXPECT_EQ(0, result.NumTotalAssignedBytes(0)); EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1)); @@ -368,7 +368,7 @@ TEST_F(SchedulerTest, TestSendUpdates) { scheduler.RemoveBackend(cluster.hosts()[1]); result.Reset(); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0)); EXPECT_EQ(0, result.NumTotalAssignedBytes(1)); @@ -376,7 +376,7 @@ TEST_F(SchedulerTest, TestSendUpdates) { scheduler.AddBackend(cluster.hosts()[1]); result.Reset(); - scheduler.Compute(&result); + ASSERT_OK(scheduler.Compute(&result)); // Two backends are registered, so the scheduler will pick a random one. EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0)); EXPECT_EQ(0, result.NumTotalAssignedBytes(1)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/client-request-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index d75e639..013e552 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -627,7 +627,7 @@ void ClientRequestState::Wait() { } else { query_events()->MarkEvent("Request finished"); } - (void) UpdateQueryStatus(status); + discard_result(UpdateQueryStatus(status)); } if (status.ok()) { UpdateNonErrorQueryState(beeswax::QueryState::FINISHED); @@ -681,7 +681,7 @@ Status ClientRequestState::FetchRows(const int32_t max_rows, MarkActive(); // ImpalaServer::FetchInternal has already taken our lock_ - (void) UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows)); + discard_result(UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows))); MarkInactive(); return query_status_; @@ -737,7 +737,7 @@ Status ClientRequestState::FetchRowsInternal(const int32_t max_rows, // max_rows <= 0 means no limit while ((num_rows < max_rows || max_rows <= 0) && num_rows_fetched_ < all_rows.size()) { - fetched_rows->AddOneRow(all_rows[num_rows_fetched_]); + RETURN_IF_ERROR(fetched_rows->AddOneRow(all_rows[num_rows_fetched_])); ++num_rows_fetched_; ++num_rows; } @@ -867,7 +867,7 @@ Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) { bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION; if (!already_done && cause != NULL) { DCHECK(!cause->ok()); - (void) UpdateQueryStatus(*cause); + discard_result(UpdateQueryStatus(*cause)); query_events_->MarkEvent("Cancelled"); DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/fe-support.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index 379829d..2c38b30 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -69,9 +69,9 @@ Java_org_apache_impala_service_FeSupport_NativeFeTestInit( // Init the JVM to load the classes in JniUtil that are needed for returning // exceptions to the FE. InitCommonRuntime(1, &name, true, TestInfo::FE_TEST); - LlvmCodeGen::InitializeLlvm(true); + THROW_IF_ERROR(LlvmCodeGen::InitializeLlvm(true), env, JniUtil::internal_exc_class()); ExecEnv* exec_env = new ExecEnv(); // This also caches it from the process. - exec_env->InitForFeTests(); + THROW_IF_ERROR(exec_env->InitForFeTests(), env, JniUtil::internal_exc_class()); } // Serializes expression value 'value' to thrift structure TColumnValue 'col_val'. @@ -168,9 +168,12 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow( vector<TColumnValue> results; ObjectPool obj_pool; - DeserializeThriftMsg(env, thrift_expr_batch, &expr_batch); - DeserializeThriftMsg(env, thrift_query_ctx_bytes, &query_ctx); + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_expr_batch, &expr_batch), env, + JniUtil::internal_exc_class(), nullptr); + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_query_ctx_bytes, &query_ctx), env, + JniUtil::internal_exc_class(), nullptr); vector<TExpr>& texprs = expr_batch.exprs; + // Disable codegen advisorily to avoid unnecessary latency. For testing purposes // (expr-test.cc), fe_support_disable_codegen may be set to false. query_ctx.disable_codegen_hint = fe_support_disable_codegen; @@ -377,7 +380,8 @@ JNIEXPORT jbyteArray JNICALL Java_org_apache_impala_service_FeSupport_NativeCacheJar( JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) { TCacheJarParams params; - DeserializeThriftMsg(env, thrift_struct, ¶ms); + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, ¶ms), env, + JniUtil::internal_exc_class(), nullptr); TCacheJarResult result; string local_path; @@ -397,7 +401,8 @@ JNIEXPORT jbyteArray JNICALL Java_org_apache_impala_service_FeSupport_NativeLookupSymbol( JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) { TSymbolLookupParams lookup; - DeserializeThriftMsg(env, thrift_struct, &lookup); + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &lookup), env, + JniUtil::internal_exc_class(), nullptr); vector<ColumnType> arg_types; for (int i = 0; i < lookup.arg_types.size(); ++i) { @@ -420,7 +425,8 @@ JNIEXPORT jbyteArray JNICALL Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad( JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) { TPrioritizeLoadRequest request; - DeserializeThriftMsg(env, thrift_struct, &request); + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env, + JniUtil::internal_exc_class(), nullptr); CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), NULL, NULL); TPrioritizeLoadResponse result;
