Repository: hadoop Updated Branches: refs/heads/HDFS-8707 775150723 -> 71023fd27
HDFS-9118: libhdfs++ Add logging system. Contributed by James Clampffer Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/71023fd2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/71023fd2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/71023fd2 Branch: refs/heads/HDFS-8707 Commit: 71023fd2718a777b63351b7fb94ccc62a00f970a Parents: 7751507 Author: James <[email protected]> Authored: Wed Mar 23 18:00:16 2016 -0400 Committer: James <[email protected]> Committed: Wed Mar 23 18:00:16 2016 -0400 ---------------------------------------------------------------------- .../native/libhdfspp/include/hdfspp/hdfs_ext.h | 73 +++- .../main/native/libhdfspp/include/hdfspp/log.h | 60 +++ .../native/libhdfspp/lib/bindings/c/hdfs.cc | 135 +++++++ .../native/libhdfspp/lib/common/CMakeLists.txt | 2 +- .../libhdfspp/lib/common/hdfs_public_api.cc | 4 +- .../main/native/libhdfspp/lib/common/logging.cc | 216 +++++++++++ .../main/native/libhdfspp/lib/common/logging.h | 189 ++++++++-- .../main/native/libhdfspp/lib/fs/filehandle.cc | 38 ++ .../main/native/libhdfspp/lib/fs/filesystem.cc | 36 +- .../native/libhdfspp/lib/reader/block_reader.cc | 40 ++ .../native/libhdfspp/lib/rpc/rpc_connection.cc | 2 +- .../native/libhdfspp/lib/rpc/rpc_connection.h | 24 +- .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 23 +- .../main/native/libhdfspp/tests/CMakeLists.txt | 5 + .../main/native/libhdfspp/tests/logging_test.cc | 374 +++++++++++++++++++ 15 files changed, 1177 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h index 1ddeba1..1e73bc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h @@ -18,6 +18,8 @@ #ifndef LIBHDFSPP_HDFS_HDFSEXT #define LIBHDFSPP_HDFS_HDFSEXT +#include <hdfspp/log.h> + /* get typdefs and #defines from libhdfs' hdfs.h to stay consistent */ #include <hdfs/hdfs.h> @@ -95,21 +97,72 @@ struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory); * @return 0 on success; nonzero error code otherwise. * Failure to find the key is not an error. */ +LIBHDFS_EXTERNAL int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key, char **val); - /** - * Get a configuration integer from the settings currently read into the builder. - * - * @param key The key to find - * @param val (out param) The value. This will NOT be changed if the - * key isn't found. - * - * @return 0 on success; nonzero error code otherwise. - * Failure to find the key is not an error. - */ +/** + * Get a configuration integer from the settings currently read into the builder. + * + * @param key The key to find + * @param val (out param) The value. This will NOT be changed if the + * key isn't found. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ +LIBHDFS_EXTERNAL int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val); + +/** + * Client can supply a C style function pointer to be invoked any time something + * is logged. Unlike the C++ logger this will not filter by level or component, + * it is up to the consumer to throw away messages they don't want. + * + * Note: The callback provided must be reentrant, the library does not guarentee + * that there won't be concurrent calls. + * Note: Callback does not own the LogData struct. If the client would like to + * keep one around use hdfsCopyLogData/hdfsFreeLogData. + **/ +LIBHDFS_EXTERNAL +void hdfsSetLogFunction(void (*hook)(LogData*)); + +/** + * Create a copy of the LogData object passed in and return a pointer to it. + * Returns null if it was unable to copy/ + **/ +LIBHDFS_EXTERNAL +LogData *hdfsCopyLogData(const LogData*); + +/** + * Client must call this to dispose of the LogData created by hdfsCopyLogData. + **/ +LIBHDFS_EXTERNAL +void hdfsFreeLogData(LogData*); + +/** + * Enable loggind functionality for a component. + * Return 1 on failure, 0 otherwise. + **/ +LIBHDFS_EXTERNAL +int hdfsEnableLoggingForComponent(int component); + +/** + * Disable logging functionality for a component. + * Return 1 on failure, 0 otherwise. + **/ +LIBHDFS_EXTERNAL +int hdfsDisableLoggingForComponent(int component); + +/** + * Set level between trace and error. + * Return 1 on failure, 0 otherwise. + **/ +LIBHDFS_EXTERNAL +int hdfsSetLoggingLevel(int component); + + #ifdef __cplusplus } /* end extern "C" */ #endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/log.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/log.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/log.h new file mode 100644 index 0000000..0371951 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/log.h @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFSPP_HDFS_LOG +#define LIBHDFSPP_HDFS_LOG + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Things that are part of the public API but are specific to logging live here. + * Added to avoid including the whole public API into the implementation of the logger. + **/ + +/* logging levels, compatible with enum in lib/common/logging.cc */ +#define HDFSPP_LOG_LEVEL_TRACE 0 +#define HDFSPP_LOG_LEVEL_DEBUG 1 +#define HDFSPP_LOG_LEVEL_INFO 2 +#define HDFSPP_LOG_LEVEL_WARN 3 +#define HDFSPP_LOG_LEVEL_ERROR 4 + +/* components emitting messages, compatible with enum lib/common/logging.cc */ +#define HDFSPP_LOG_COMPONENT_UNKNOWN 1 << 0 +#define HDFSPP_LOG_COMPONENT_RPC 1 << 1 +#define HDFSPP_LOG_COMPONENT_BLOCKREADER 1 << 2 +#define HDFSPP_LOG_COMPONENT_FILEHANDLE 1 << 3 +#define HDFSPP_LOG_COMPONENT_FILESYSTEM 1 << 4 + +/** + * POD struct for C to consume (C++ interface gets to take advantage of RAII) + **/ +typedef struct { + const char *msg; + int level; + int component; + const char *file_name; + int file_line; +} LogData; + +#ifdef __cplusplus +} // end extern C +#endif + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index fd46f3f..9ce5c86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -21,6 +21,7 @@ #include "fs/filesystem.h" #include "common/hdfs_configuration.h" #include "common/configuration_loader.h" +#include "common/logging.h" #include <hdfs/hdfs.h> #include <hdfspp/hdfs_ext.h> @@ -601,3 +602,137 @@ int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val return ReportCaughtNonException(); } } + +/** + * Logging functions + **/ +class CForwardingLogger : public LoggerInterface { + public: + CForwardingLogger() : callback_(nullptr) {}; + + // Converts LogMessage into LogData, a POD type, + // and invokes callback_ if it's not null. + void Write(const LogMessage& msg); + + // pass in NULL to clear the hook + void SetCallback(void (*callback)(LogData*)); + + //return a copy, or null on failure. + static LogData *CopyLogData(const LogData*); + //free LogData allocated with CopyLogData + static void FreeLogData(LogData*); + private: + void (*callback_)(LogData*); +}; + +/** + * Plugin to forward message to a C function pointer + **/ +void CForwardingLogger::Write(const LogMessage& msg) { + if(!callback_) + return; + + const std::string text = msg.MsgString(); + + LogData data; + data.level = msg.level(); + data.component = msg.component(); + data.msg = text.c_str(); + data.file_name = msg.file_name(); + data.file_line = msg.file_line(); + callback_(&data); +} + +void CForwardingLogger::SetCallback(void (*callback)(LogData*)) { + callback_ = callback; +} + +LogData *CForwardingLogger::CopyLogData(const LogData *orig) { + if(!orig) + return nullptr; + + LogData *copy = (LogData*)malloc(sizeof(LogData)); + if(!copy) + return nullptr; + + copy->level = orig->level; + copy->component = orig->component; + if(orig->msg) + copy->msg = strdup(orig->msg); + copy->file_name = orig->file_name; + copy->file_line = orig->file_line; + return copy; +} + +void CForwardingLogger::FreeLogData(LogData *data) { + if(!data) + return; + if(data->msg) + free((void*)data->msg); + + // Inexpensive way to help catch use-after-free + memset(data, 0, sizeof(LogData)); + free(data); +} + + +LogData *hdfsCopyLogData(LogData *data) { + return CForwardingLogger::CopyLogData(data); +} + +void hdfsFreeLogData(LogData *data) { + CForwardingLogger::FreeLogData(data); +} + +void hdfsSetLogFunction(void (*callback)(LogData*)) { + CForwardingLogger *logger = new CForwardingLogger(); + logger->SetCallback(callback); + LogManager::SetLoggerImplementation(std::unique_ptr<LoggerInterface>(logger)); +} + +static bool IsLevelValid(int component) { + if(component < HDFSPP_LOG_LEVEL_TRACE || component > HDFSPP_LOG_LEVEL_ERROR) + return false; + return true; +} + + +// should use __builtin_popcnt as optimization on some platforms +static int popcnt(int val) { + int bits = sizeof(val) * 8; + int count = 0; + for(int i=0; i<bits; i++) { + if((val >> i) & 0x1) + count++; + } + return count; +} + +static bool IsComponentValid(int component) { + if(component < HDFSPP_LOG_COMPONENT_UNKNOWN || component > HDFSPP_LOG_COMPONENT_FILESYSTEM) + return false; + if(popcnt(component) != 1) + return false; + return true; +} + +int hdfsEnableLoggingForComponent(int component) { + if(!IsComponentValid(component)) + return 1; + LogManager::EnableLogForComponent(static_cast<LogSourceComponent>(component)); + return 0; +} + +int hdfsDisableLoggingForComponent(int component) { + if(!IsComponentValid(component)) + return 1; + LogManager::DisableLogForComponent(static_cast<LogSourceComponent>(component)); + return 0; +} + +int hdfsSetLoggingLevel(int level) { + if(!IsLevelValid(level)) + return 1; + LogManager::SetLogLevel(static_cast<LogLevel>(level)); + return 0; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt index 2e78643..77860b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -19,6 +19,6 @@ if(NEED_LINK_DL) set(LIB_DL dl) endif() -add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc) +add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc) add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>) target_link_libraries(common ${LIB_DL}) http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc index a7f7b49..188071d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc @@ -36,9 +36,9 @@ void IoServiceImpl::Run() { io_service_.run(); break; } catch (const std::exception & e) { - LOG_WARN() << "Unexpected exception in libhdfspp worker thread: " << e.what(); + LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what()); } catch (...) { - LOG_WARN() << "Unexpected value not derived from std::exception in libhdfspp worker thread"; + LOG_WARN(kFileSystem, << "Unexpected value not derived from std::exception in libhdfspp worker thread"); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc new file mode 100644 index 0000000..c299761 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "logging.h" + +#include <ctime> +#include <cstring> +#include <thread> +#include <iostream> +#include <sstream> + +namespace hdfs +{ + +LogManager::LogManager() {} +std::unique_ptr<LoggerInterface> LogManager::logger_impl_(new StderrLogger()); +std::mutex LogManager::impl_lock_; +uint32_t LogManager::component_mask_ = 0xFFFFFFFF; +uint32_t LogManager::level_threshold_ = kWarning; + +void LogManager::DisableLogForComponent(LogSourceComponent c) { + // AND with all bits other than one we want to unset + std::lock_guard<std::mutex> impl_lock(impl_lock_); + component_mask_ &= ~c; +} + +void LogManager::EnableLogForComponent(LogSourceComponent c) { + // OR with bit to set + std::lock_guard<std::mutex> impl_lock(impl_lock_); + component_mask_ |= c; +} + +void LogManager::SetLogLevel(LogLevel level) { + std::lock_guard<std::mutex> impl_lock(impl_lock_); + level_threshold_ = level; +} + +void LogManager::Write(const LogMessage& msg) { + std::lock_guard<std::mutex> impl_lock(impl_lock_); + if(logger_impl_) + logger_impl_->Write(msg); +} + +void LogManager::SetLoggerImplementation(std::unique_ptr<LoggerInterface> impl) { + std::lock_guard<std::mutex> impl_lock(impl_lock_); + logger_impl_.reset(impl.release()); +} + + +/** + * Simple plugin to dump logs to stderr + **/ +void StderrLogger::Write(const LogMessage& msg) { + std::stringstream formatted; + + if(show_level_) + formatted << msg.level_string(); + + if(show_component_) + formatted << msg.component_string(); + + if(show_timestamp_) { + time_t current_time = std::time(nullptr); + char timestr[128]; + memset(timestr, 0, 128); + int res = std::strftime(timestr, 128, "%a %b %e %H:%M:%S %Y", std::localtime(¤t_time)); + if(res > 0) { + formatted << '[' << (const char*)timestr << ']'; + } else { + formatted << "[Error formatting timestamp]"; + } + } + + if(show_component_) { + formatted << "[Thread id = " << std::this_thread::get_id() << ']'; + } + + if(show_file_) { + // __FILE__ contains absolute path, which is giant if doing a build inside the + // Hadoop tree. Trim down to relative to libhdfspp/ + std::string abs_path(msg.file_name()); + size_t rel_path_idx = abs_path.find("libhdfspp/"); + // Default to whole string if library is being built in an odd way + if(rel_path_idx == std::string::npos) + rel_path_idx = 0; + + formatted << '[' << (const char*)&abs_path[rel_path_idx] << ":" << msg.file_line() << ']'; + } + + std::cerr << formatted.str() << " " << msg.MsgString() << std::endl; +} + +void StderrLogger::set_show_timestamp(bool show) { + show_timestamp_ = show; +} +void StderrLogger::set_show_level(bool show) { + show_level_ = show; +} +void StderrLogger::set_show_thread(bool show) { + show_thread_ = show; +} +void StderrLogger::set_show_component(bool show) { + show_component_ = show; +} + + +LogMessage::~LogMessage() { + LogManager::Write(*this); +} + +LogMessage& LogMessage::operator<<(const std::string *str) { + if(str) + msg_buffer_ << str; + else + msg_buffer_ << "<nullptr>"; + return *this; +} + +LogMessage& LogMessage::operator<<(const std::string& str) { + msg_buffer_ << str; + return *this; +} + +LogMessage& LogMessage::operator<<(const char *str) { + if(str) + msg_buffer_ << str; + else + msg_buffer_ << "<nullptr>"; + return *this; +} + +LogMessage& LogMessage::operator<<(bool val) { + if(val) + msg_buffer_ << "true"; + else + msg_buffer_ << "false"; + return *this; +} + +LogMessage& LogMessage::operator<<(int32_t val) { + msg_buffer_ << val; + return *this; +} + +LogMessage& LogMessage::operator<<(uint32_t val) { + msg_buffer_ << val; + return *this; +} + +LogMessage& LogMessage::operator<<(int64_t val) { + msg_buffer_ << val; + return *this; +} + +LogMessage& LogMessage::operator<<(uint64_t val) { + msg_buffer_ << val; + return *this; +} + + +LogMessage& LogMessage::operator<<(void *ptr) { + msg_buffer_ << ptr; + return *this; +} + +std::string LogMessage::MsgString() const { + return msg_buffer_.str(); +} + +const char * kLevelStrings[5] = { + "[TRACE ]", + "[DEBUG ]", + "[INFO ]", + "[WARN ]", + "[ERROR ]" +}; + +const char * LogMessage::level_string() const { + return kLevelStrings[level_]; +} + +const char * kComponentStrings[5] = { + "[Unknown ]", + "[RPC ]", + "[BlockReader ]", + "[FileHandle ]", + "[FileSystem ]" +}; + +const char * LogMessage::component_string() const { + switch(component_) { + case kRPC: return kComponentStrings[1]; + case kBlockReader: return kComponentStrings[2]; + case kFileHandle: return kComponentStrings[3]; + case kFileSystem: return kComponentStrings[4]; + default: return kComponentStrings[0]; + } +} + +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h index 82bdae0..3403646 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h @@ -19,41 +19,184 @@ #ifndef LIB_COMMON_LOGGING_H_ #define LIB_COMMON_LOGGING_H_ +#include "hdfspp/log.h" + #include <iostream> +#include <sstream> +#include <mutex> +#include <memory> namespace hdfs { +/** + * Logging mechanism to provide lightweight logging to stderr as well as + * as a callback mechanism to allow C clients and larger third party libs + * to be used to handle logging. When adding a new log message to the + * library use the macros defined below (LOG_TRACE..LOG_ERROR) rather than + * using the LogMessage and LogManager objects directly. + **/ + enum LogLevel { - kDebug, - kInfo, - kWarning, - kError, + kTrace = 0, + kDebug = 1, + kInfo = 2, + kWarning = 3, + kError = 4, }; -#define LOG_DEBUG() LogMessage(kDebug) -#define LOG_INFO() LogMessage(kInfo) -#define LOG_WARN() LogMessage(kWarning) -#define LOG_ERROR() LogMessage(kError) +enum LogSourceComponent { + kUnknown = 1 << 0, + kRPC = 1 << 1, + kBlockReader = 1 << 2, + kFileHandle = 1 << 3, + kFileSystem = 1 << 4, +}; -class LogMessage { +#define LOG_TRACE(C, MSG) do { \ +if(LogManager::ShouldLog(kTrace,C)) { \ + LogMessage(kTrace, __FILE__, __LINE__, C) MSG; \ +}} while (0); + + +#define LOG_DEBUG(C, MSG) do { \ +if(LogManager::ShouldLog(kDebug,C)) { \ + LogMessage(kDebug, __FILE__, __LINE__, C) MSG; \ +}} while (0); + +#define LOG_INFO(C, MSG) do { \ +if(LogManager::ShouldLog(kInfo,C)) { \ + LogMessage(kInfo, __FILE__, __LINE__, C) MSG; \ +}} while (0); + +#define LOG_WARN(C, MSG) do { \ +if(LogManager::ShouldLog(kWarning,C)) { \ + LogMessage(kWarning, __FILE__, __LINE__, C) MSG; \ +}} while (0); + +#define LOG_ERROR(C, MSG) do { \ +if(LogManager::ShouldLog(kError,C)) { \ + LogMessage(kError, __FILE__, __LINE__, C) MSG; \ +}} while (0); + + +class LogMessage; + +class LoggerInterface { public: - LogMessage(const LogLevel &l) { - static constexpr const char * kLogLevelMessage[] = {"DEBUG", "INFO", "WARN", "ERROR"}; - ::std::cerr << "[" << kLogLevelMessage[(size_t)l] << "] "; - } + LoggerInterface() {}; + virtual ~LoggerInterface() {}; - ~LogMessage() { - ::std::cerr << std::endl; - } + /** + * User defined handling messages, common case would be printing somewhere. + **/ + virtual void Write(const LogMessage& msg) = 0; +}; - LogMessage& operator<<(const std::string& msg) { - ::std::cerr << msg; - return *this; - } - LogMessage& operator<<(int x) { - ::std::cerr << x; - return *this; +/** + * StderrLogger unsuprisingly dumps messages to stderr. + * This is the default logger if nothing else is explicitly set. + **/ +class StderrLogger : public LoggerInterface { + public: + StderrLogger() : show_timestamp_(true), show_level_(true), + show_thread_(true), show_component_(true), + show_file_(true) {} + void Write(const LogMessage& msg); + void set_show_timestamp(bool show); + void set_show_level(bool show); + void set_show_thread(bool show); + void set_show_component(bool show); + private: + bool show_timestamp_; + bool show_level_; + bool show_thread_; + bool show_component_; + bool show_file_; +}; + + +/** + * LogManager provides a thread safe static interface to the underlying + * logger implementation. + **/ +class LogManager { + friend class LogMessage; + public: + // allow easy inlining + static bool ShouldLog(LogLevel level, LogSourceComponent source) { + std::lock_guard<std::mutex> impl_lock(impl_lock_); + if(level < level_threshold_) + return false; + if(!(source & component_mask_)) + return false; + return true; } + static void Write(const LogMessage & msg); + static void EnableLogForComponent(LogSourceComponent c); + static void DisableLogForComponent(LogSourceComponent c); + static void SetLogLevel(LogLevel level); + static void SetLoggerImplementation(std::unique_ptr<LoggerInterface> impl); + + private: + // don't create instances of this + LogManager(); + // synchronize all unsafe plugin calls + static std::mutex impl_lock_; + static std::unique_ptr<LoggerInterface> logger_impl_; + // component and level masking + static uint32_t component_mask_; + static uint32_t level_threshold_; +}; + +/** + * LogMessage contains message text, along with other metadata about the message. + * Note: For performance reasons a set of macros (see top of file) is used to + * create these inside of an if block. Do not instantiate these directly, doing + * so will cause the message to be uncontitionally logged. This minor inconvinience + * gives us a ~20% performance increase in the (common) case where few messages + * are worth logging; std::stringstream is expensive to construct. + **/ +class LogMessage { + friend class LogManager; + public: + LogMessage(const LogLevel &l, const char *file, int line, + LogSourceComponent component = kUnknown) : + level_(l), component_(component), origin_file_(file), origin_line_(line){} + + ~LogMessage(); + + const char *level_string() const; + const char *component_string() const; + LogLevel level() const {return level_; } + LogSourceComponent component() const {return component_; } + int file_line() const {return origin_line_; } + const char * file_name() const {return origin_file_; } + + //print as-is, indicates when a nullptr was passed in + LogMessage& operator<<(const char *); + LogMessage& operator<<(const std::string*); + LogMessage& operator<<(const std::string&); + + + //convert to a string "true"/"false" + LogMessage& operator<<(bool); + + LogMessage& operator<<(int32_t); + LogMessage& operator<<(uint32_t); + LogMessage& operator<<(int64_t); + LogMessage& operator<<(uint64_t); + + //print address as hex + LogMessage& operator<<(void *); + + std::string MsgString() const; + + private: + LogLevel level_; + LogSourceComponent component_; + const char *origin_file_; + const int origin_line_; + std::stringstream msg_buffer_; }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc index ad630c0..b3954e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -18,12 +18,15 @@ #include "filehandle.h" #include "common/continuation/continuation.h" +#include "common/logging.h" #include "connection/datanodeconnection.h" #include "reader/block_reader.h" #include <future> #include <tuple> +#define FMT_THIS_ADDR "this=" << (void*)this + namespace hdfs { using ::hadoop::hdfs::LocatedBlocksProto; @@ -35,11 +38,17 @@ FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string std::shared_ptr<BadDataNodeTracker> bad_data_nodes) : io_service_(io_service), client_name_(client_name), file_info_(file_info), bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()) { + LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl(" + << FMT_THIS_ADDR << ", ...) called"); } void FileHandleImpl::PositionRead( void *buf, size_t nbyte, uint64_t offset, const std::function<void(const Status &, size_t)> &handler) { + LOG_TRACE(kFileHandle, << "FileHandleImpl::PositionRead(" + << FMT_THIS_ADDR << ", buf=" << buf + << ", nbyte=" << nbyte << ") called"); + /* prevent usage after cancelation */ if(cancel_state_->is_canceled()) { handler(Status::Canceled(), 0); @@ -61,6 +70,10 @@ void FileHandleImpl::PositionRead( } Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) { + LOG_TRACE(kFileHandle, << "FileHandleImpl::[sync]PositionRead(" + << FMT_THIS_ADDR << ", buf=" << buf + << ", nbyte=" << *nbyte << ") called"); + auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>(); std::future<std::tuple<Status, size_t>> future(callstate->get_future()); @@ -84,6 +97,10 @@ Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) { } Status FileHandleImpl::Read(void *buf, size_t *nbyte) { + LOG_TRACE(kFileHandle, << "FileHandleImpl::Read(" + << FMT_THIS_ADDR << ", buf=" << buf + << ", nbyte=" << *nbyte << ") called"); + Status stat = PositionRead(buf, nbyte, offset_); if(!stat.ok()) { return stat; @@ -94,6 +111,9 @@ Status FileHandleImpl::Read(void *buf, size_t *nbyte) { } Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) { + LOG_TRACE(kFileHandle, << "FileHandleImpl::Seek(" + << ", offset=" << *offset << ", ...) called"); + if(cancel_state_->is_canceled()) { return Status::Canceled(); } @@ -146,6 +166,9 @@ void FileHandleImpl::AsyncPreadSome( using ::hadoop::hdfs::DatanodeInfoProto; using ::hadoop::hdfs::LocatedBlockProto; + LOG_TRACE(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" + << FMT_THIS_ADDR << ", ...) called"); + if(cancel_state_->is_canceled()) { handler(Status::Canceled(), "", 0); return; @@ -161,6 +184,8 @@ void FileHandleImpl::AsyncPreadSome( }); if (block == file_info_->blocks_.end()) { + LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" << FMT_THIS_ADDR + << ", ...) Cannot find corresponding blocks"); handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); return; } @@ -179,6 +204,9 @@ void FileHandleImpl::AsyncPreadSome( }); if (it == datanodes.end()) { + LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" + << FMT_THIS_ADDR << ", ...) No datanodes available"); + handler(Status::ResourceUnavailable("No datanodes available"), "", 0); return; } @@ -224,6 +252,11 @@ std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReader std::shared_ptr<DataNodeConnection> dn) { std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_); + + LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR + << ", ..., dnconn=" << dn.get() + << ") called. New BlockReader = " << reader.get()); + readers_.AddReader(reader); return reader; } @@ -232,10 +265,15 @@ std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection( ::asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto & dn, const hadoop::common::TokenProto * token) { + LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection(" + << FMT_THIS_ADDR << ", ...) called"); return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token); } void FileHandleImpl::CancelOperations() { + LOG_INFO(kFileHandle, << "FileHandleImpl::CancelOperations(" + << FMT_THIS_ADDR << ") called"); + cancel_state_->set_canceled(); /* Push update to BlockReaders that may be hung in an asio call */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index 0b28488..8f386ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -19,6 +19,7 @@ #include "filesystem.h" #include "common/continuation/asio.h" #include "common/util.h" +#include "common/logging.h" #include <asio/ip/tcp.hpp> @@ -29,6 +30,8 @@ #include <iostream> #include <pwd.h> +#define FMT_THIS_ADDR "this=" << (void*)this + namespace hdfs { static const char kNamenodeProtocol[] = @@ -66,6 +69,9 @@ void NameNodeOperations::GetBlockLocations(const std::string & path, using ::hadoop::hdfs::GetBlockLocationsRequestProto; using ::hadoop::hdfs::GetBlockLocationsResponseProto; + LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations(" + << FMT_THIS_ADDR << ", path=" << path << ", ...) called"); + struct State { GetBlockLocationsRequestProto req; std::shared_ptr<GetBlockLocationsResponseProto> resp; @@ -158,6 +164,9 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n kNamenodeProtocolVersion), client_name_(GetRandomClientName()), bad_node_tracker_(std::make_shared<BadDataNodeTracker>()) { + LOG_TRACE(kFileSystem, << "FileSystemImpl::FileSystemImpl(" + << FMT_THIS_ADDR << ") called"); + // Poor man's move io_service = nullptr; @@ -169,6 +178,9 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n } FileSystemImpl::~FileSystemImpl() { + LOG_TRACE(kFileSystem, << "FileSystemImpl::~FileSystemImpl(" + << FMT_THIS_ADDR << ") called"); + /** * Note: IoService must be stopped before getting rid of worker threads. * Once worker threads are joined and deleted the service can be deleted. @@ -180,6 +192,10 @@ FileSystemImpl::~FileSystemImpl() { void FileSystemImpl::Connect(const std::string &server, const std::string &service, const std::function<void(const Status &, FileSystem * fs)> &handler) { + LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR + << ", server=" << server << ", service=" + << service << ") called"); + /* IoService::New can return nullptr */ if (!io_service_) { handler (Status::Error("Null IoService"), this); @@ -191,6 +207,9 @@ void FileSystemImpl::Connect(const std::string &server, } Status FileSystemImpl::Connect(const std::string &server, const std::string &service) { + LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR + << ", server=" << server << ", service=" << service << ") called"); + /* synchronized */ auto stat = std::make_shared<std::promise<Status>>(); std::future<Status> future = stat->get_future(); @@ -252,6 +271,10 @@ Status FileSystemImpl::ConnectToDefaultFs() { int FileSystemImpl::AddWorkerThread() { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread(" + << FMT_THIS_ADDR << ") called." + << " Existing thread count = " << worker_threads_.size()); + auto service_task = [](IoService *service) { service->Run(); }; worker_threads_.push_back( WorkerPtr(new std::thread(service_task, io_service_.get()))); @@ -261,6 +284,9 @@ int FileSystemImpl::AddWorkerThread() { void FileSystemImpl::Open( const std::string &path, const std::function<void(const Status &, FileHandle *)> &handler) { + LOG_INFO(kFileSystem, << "FileSystemImpl::Open(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); nn_.GetBlockLocations(path, [this, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) { handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), client_name_, file_info, bad_node_tracker_) @@ -270,6 +296,10 @@ void FileSystemImpl::Open( Status FileSystemImpl::Open(const std::string &path, FileHandle **handle) { + LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Open(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>(); std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future()); @@ -302,9 +332,9 @@ void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) { // from within one of the worker threads, leading to a deadlock. Let's // provide some explicit protection. if(t->get_id() == std::this_thread::get_id()) { - //TODO: When we get good logging support, add it in here - std::cerr << "FATAL: Attempted to destroy a thread pool from within a " - "callback of the thread pool.\n"; + LOG_ERROR(kFileSystem, << "FileSystemImpl::WorkerDeleter::operator(treadptr=" + << t << ") : FATAL: Attempted to destroy a thread pool" + "from within a callback of the thread pool!"); } t->join(); delete t; http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc index 594aaf5..4ee86c2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -19,12 +19,17 @@ #include "reader/datatransfer.h" #include "common/continuation/continuation.h" #include "common/continuation/asio.h" +#include "common/logging.h" #include <future> namespace hdfs { +#define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_ +#define FMT_CONT_AND_READER_ADDR "this=" << (void*)this << ", reader=" << (void*)reader_ +#define FMT_THIS_ADDR "this=" << (void*)this + hadoop::hdfs::OpReadBlockProto ReadBlockProto(const std::string &client_name, bool verify_checksum, const hadoop::common::TokenProto *token, @@ -54,6 +59,10 @@ void BlockReaderImpl::AsyncRequestBlock( const std::string &client_name, const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset, const std::function<void(Status)> &handler) { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncRequestBlock(" + << FMT_THIS_ADDR << ", ..., length=" + << length << ", offset=" << offset << ", ...) called"); + // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at // the beginning in order to chunk-align. Note that the DN may elect @@ -103,6 +112,10 @@ Status BlockReaderImpl::RequestBlock( const std::string &client_name, const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset) { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlock(" + << FMT_THIS_ADDR <<"..., length=" + << length << ", offset=" << offset << ") called"); + auto stat = std::make_shared<std::promise<Status>>(); std::future<Status> future(stat->get_future()); AsyncRequestBlock(client_name, block, length, offset, @@ -121,6 +134,9 @@ struct BlockReaderImpl::ReadPacketHeader ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent) {} virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacketHeader::Run(" + << FMT_CONT_AND_PARENT_ADDR << ") called"); + parent_->packet_data_read_bytes_ = 0; parent_->packet_len_ = 0; auto handler = [next, this](const asio::error_code &ec, size_t) { @@ -178,6 +194,9 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation { ReadChecksum(BlockReaderImpl *parent) : parent_(parent) {} virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadChecksum::Run(" + << FMT_CONT_AND_PARENT_ADDR << ") called"); + auto parent = parent_; if (parent->state_ != kReadChecksum) { next(Status::OK()); @@ -216,6 +235,9 @@ struct BlockReaderImpl::ReadData : continuation::Continuation { } virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run(" + << FMT_CONT_AND_PARENT_ADDR << ") called"); + auto handler = [next, this](const asio::error_code &ec, size_t transferred) { Status status; @@ -251,6 +273,9 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation { parent, bytes_transferred_, asio::buffer(padding_))) {} virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPadding::Run(" + << FMT_CONT_AND_PARENT_ADDR << ") called"); + if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) { next(Status::OK()); return; @@ -282,6 +307,8 @@ struct BlockReaderImpl::AckRead : continuation::Continuation { AckRead(BlockReaderImpl *parent) : parent_(parent) {} virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::AckRead::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called"); + if (parent_->bytes_to_read_ > 0) { next(Status::OK()); return; @@ -314,6 +341,8 @@ void BlockReaderImpl::AsyncReadPacket( const std::function<void(const Status &, size_t bytes_transferred)> &handler) { assert(state_ != kOpen && "Not connected"); + LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadPacket called"); + struct State { std::shared_ptr<size_t> bytes_transferred; }; @@ -337,6 +366,8 @@ void BlockReaderImpl::AsyncReadPacket( size_t BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status *status) { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called"); + size_t transferred = 0; auto done = std::make_shared<std::promise<void>>(); auto future = done->get_future(); @@ -361,6 +392,9 @@ struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation { } virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlockContinuation::Run(" + << FMT_CONT_AND_READER_ADDR << ") called"); + reader_->AsyncRequestBlock(client_name_, &block_, length_, offset_, next); } @@ -381,6 +415,8 @@ struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation { } virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadBlockContinuation::Run(" + << FMT_CONT_AND_READER_ADDR << ") called"); *transferred_ = 0; next_ = next; OnReadData(Status::OK(), 0); @@ -415,6 +451,8 @@ void BlockReaderImpl::AsyncReadBlock( size_t offset, const MutableBuffers &buffers, const std::function<void(const Status &, size_t)> handler) { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock(" + << FMT_THIS_ADDR << ") called"); auto m = continuation::Pipeline<size_t>::Create(cancel_state_); size_t * bytesTransferred = &m->state(); @@ -432,6 +470,8 @@ void BlockReaderImpl::AsyncReadBlock( } void BlockReaderImpl::CancelOperation() { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::CancelOperation(" + << FMT_THIS_ADDR << ") called"); /* just forward cancel to DNConnection */ dn_->Cancel(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index 91d8667..c65c063 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -191,7 +191,7 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) { auto req = RemoveFromRunningQueue(h.callid()); if (!req) { - LOG_WARN() << "RPC response with Unknown call id " << h.callid(); + LOG_WARN(kRPC, << "RPC response with Unknown call id " << h.callid()); return; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index a8820c2..3413438 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -63,11 +63,14 @@ template <class NextLayer> RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()) {} + next_layer_(engine->io_service()) { + LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called"); + } template <class NextLayer> void RpcConnectionImpl<NextLayer>::Connect( const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler) { + LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called"); auto connectionSuccessfulReq = std::make_shared<Request>( engine_, [handler](::google::protobuf::io::CodedInputStream *is, const Status &status) { @@ -105,6 +108,8 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec) auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this(); std::lock_guard<std::mutex> state_lock(connection_state_lock_); + LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called"); + Status status = ToStatus(ec); if (status.ok()) { StartReading(); @@ -131,6 +136,9 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec) template <class NextLayer> void RpcConnectionImpl<NextLayer>::HandshakeComplete(const Status &s) { std::lock_guard<std::mutex> state_lock(connection_state_lock_); + + LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called"); + if (s.ok()) { FlushPendingRequests(); } else { @@ -143,6 +151,8 @@ template <class NextLayer> void RpcConnectionImpl<NextLayer>::Handshake(RpcCallback &handler) { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + LOG_TRACE(kRPC, << "RpcConnectionImpl::Handshake called"); + auto shared_this = shared_from_this(); auto handshake_packet = PrepareHandshakePacket(); ::asio::async_write(next_layer_, asio::buffer(*handshake_packet), @@ -163,9 +173,11 @@ void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec, using std::placeholders::_2; std::lock_guard<std::mutex> state_lock(connection_state_lock_); + LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called"); + request_over_the_wire_.reset(); if (ec) { - LOG_WARN() << "Network error during RPC write: " << ec.message(); + LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message()); CommsError(ToStatus(ec)); return; } @@ -180,6 +192,8 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() { // Lock should be held assert(lock_held(connection_state_lock_)); + LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called"); + if (pending_requests_.empty()) { return; } @@ -233,6 +247,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec, using std::placeholders::_2; std::lock_guard<std::mutex> state_lock(connection_state_lock_); + LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called"); + std::shared_ptr<RpcConnection> shared_this = shared_from_this(); switch (ec.value()) { @@ -243,7 +259,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec, // The event loop has been shut down. Ignore the error. return; default: - LOG_WARN() << "Network error during RPC read: " << ec.message(); + LOG_WARN(kRPC, << "Network error during RPC read: " << ec.message()); CommsError(ToStatus(ec)); return; } @@ -281,6 +297,8 @@ template <class NextLayer> void RpcConnectionImpl<NextLayer>::Disconnect() { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called"); + request_over_the_wire_.reset(); if (connected_) { next_layer_.cancel(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 8d3e404..b598d0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -18,6 +18,7 @@ #include "rpc_engine.h" #include "rpc_connection.h" #include "common/util.h" +#include "common/logging.h" #include "optional.hpp" #include <future> @@ -38,11 +39,15 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, protocol_version_(protocol_version), retry_policy_(std::move(MakeRetryPolicy(options))), call_id_(0), - retry_timer(*io_service) {} + retry_timer(*io_service) { + LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called"); + } void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler) { std::lock_guard<std::mutex> state_lock(engine_state_lock_); + LOG_DEBUG(kRPC, << "RpcEngine::Connect called"); + last_endpoints_ = server; conn_ = NewConnection(); @@ -50,6 +55,7 @@ void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &server, } void RpcEngine::Shutdown() { + LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called"); io_service_->post([this]() { std::lock_guard<std::mutex> state_lock(engine_state_lock_); conn_->Disconnect(); @@ -58,6 +64,7 @@ void RpcEngine::Shutdown() { } std::unique_ptr<const RetryPolicy> RpcEngine::MakeRetryPolicy(const Options &options) { + LOG_DEBUG(kRPC, << "RpcEngine::MakeRetryPolicy called"); if (options.max_rpc_retries > 0) { return std::unique_ptr<RetryPolicy>(new FixedDelayRetryPolicy(options.rpc_retry_delay_ms, options.max_rpc_retries)); } else { @@ -74,6 +81,9 @@ void RpcEngine::AsyncRpc( const std::shared_ptr<::google::protobuf::MessageLite> &resp, const std::function<void(const Status &)> &handler) { std::lock_guard<std::mutex> state_lock(engine_state_lock_); + + LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called"); + if (!conn_) { conn_ = NewConnection(); conn_->ConnectAndFlush(last_endpoints_); @@ -84,6 +94,9 @@ void RpcEngine::AsyncRpc( Status RpcEngine::Rpc( const std::string &method_name, const ::google::protobuf::MessageLite *req, const std::shared_ptr<::google::protobuf::MessageLite> &resp) { + + LOG_TRACE(kRPC, << "RpcEngine::Rpc called"); + auto stat = std::make_shared<std::promise<Status>>(); std::future<Status> future(stat->get_future()); AsyncRpc(method_name, req, resp, @@ -93,12 +106,16 @@ Status RpcEngine::Rpc( std::shared_ptr<RpcConnection> RpcEngine::NewConnection() { + LOG_DEBUG(kRPC, << "RpcEngine::NewConnection called"); + return std::make_shared<RpcConnectionImpl<::asio::ip::tcp::socket>>(this); } Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, std::shared_ptr<std::string> resp) { + LOG_TRACE(kRPC, << "RpcEngine::RawRpc called"); + std::shared_ptr<RpcConnection> conn; { std::lock_guard<std::mutex> state_lock(engine_state_lock_); @@ -119,6 +136,8 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, void RpcEngine::AsyncRpcCommsError( const Status &status, std::vector<std::shared_ptr<Request>> pendingRequests) { + LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called"); + io_service().post([this, status, pendingRequests]() { RpcCommsError(status, pendingRequests); }); @@ -129,6 +148,8 @@ void RpcEngine::RpcCommsError( std::vector<std::shared_ptr<Request>> pendingRequests) { (void)status; + LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called"); + std::lock_guard<std::mutex> state_lock(engine_state_lock_); auto head_action = optional<RetryAction>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index a0b3774..2235c7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -103,8 +103,13 @@ add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfs build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdfs_threaded.c ${OS_DIR}/thread.c) link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY}) add_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static) + endif(HADOOP_BUILD) add_executable(hdfs_builder_test hdfs_builder_test.cc) target_link_libraries(hdfs_builder_test test_common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_memcheck_test(hdfs_builder_test hdfs_builder_test) + +add_executable(logging_test logging_test.cc) +target_link_libraries(logging_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +add_memcheck_test(logging_test logging_test) http://git-wip-us.apache.org/repos/asf/hadoop/blob/71023fd2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/logging_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/logging_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/logging_test.cc new file mode 100644 index 0000000..d487bf5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/logging_test.cc @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <common/logging.h> +#include <bindings/c/hdfs.cc> + +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <iostream> + +using namespace hdfs; + +struct log_state { + int trace_count; + int debug_count; + int info_count; + int warning_count; + int error_count; + + int origin_unknown; + int origin_rpc; + int origin_blockreader; + int origin_filehandle; + int origin_filesystem; + + std::string msg; + + log_state() { + reset(); + } + + void reset() { + trace_count = 0; + debug_count = 0; + info_count = 0; + warning_count = 0; + error_count = 0; + + origin_unknown = 0; + origin_rpc = 0; + origin_blockreader = 0; + origin_filehandle = 0; + origin_filesystem = 0; + + msg = ""; + } +}; +log_state log_state_instance; + +void process_log_msg(LogData *data) { + if(data->msg) + log_state_instance.msg = data->msg; + + switch(data->level) { + case HDFSPP_LOG_LEVEL_TRACE: + log_state_instance.trace_count++; + break; + case HDFSPP_LOG_LEVEL_DEBUG: + log_state_instance.debug_count++; + break; + case HDFSPP_LOG_LEVEL_INFO: + log_state_instance.info_count++; + break; + case HDFSPP_LOG_LEVEL_WARN: + log_state_instance.warning_count++; + break; + case HDFSPP_LOG_LEVEL_ERROR: + log_state_instance.error_count++; + break; + default: + //should never happen + std::cout << "foo" << std::endl; + ASSERT_FALSE(true); + } + + switch(data->component) { + case HDFSPP_LOG_COMPONENT_UNKNOWN: + log_state_instance.origin_unknown++; + break; + case HDFSPP_LOG_COMPONENT_RPC: + log_state_instance.origin_rpc++; + break; + case HDFSPP_LOG_COMPONENT_BLOCKREADER: + log_state_instance.origin_blockreader++; + break; + case HDFSPP_LOG_COMPONENT_FILEHANDLE: + log_state_instance.origin_filehandle++; + break; + case HDFSPP_LOG_COMPONENT_FILESYSTEM: + log_state_instance.origin_filesystem++; + break; + default: + std::cout << "bar" << std::endl; + ASSERT_FALSE(true); + } + +} + +void reset_log_counters() { + log_state_instance.reset(); +} + +void assert_nothing_logged() { + if(log_state_instance.trace_count || log_state_instance.debug_count || + log_state_instance.info_count || log_state_instance.warning_count || + log_state_instance.error_count) { + ASSERT_FALSE(true); + } +} + +void assert_trace_logged() { ASSERT_TRUE(log_state_instance.trace_count > 0); } +void assert_debug_logged() { ASSERT_TRUE(log_state_instance.debug_count > 0); } +void assert_info_logged() { ASSERT_TRUE(log_state_instance.info_count > 0); } +void assert_warning_logged() { ASSERT_TRUE(log_state_instance.warning_count > 0); } +void assert_error_logged() { ASSERT_TRUE(log_state_instance.error_count > 0); } + +void assert_no_trace_logged() { ASSERT_EQ(log_state_instance.trace_count, 0); } +void assert_no_debug_logged() { ASSERT_EQ(log_state_instance.debug_count, 0); } +void assert_no_info_logged() { ASSERT_EQ(log_state_instance.info_count, 0); } +void assert_no_warning_logged() { ASSERT_EQ(log_state_instance.warning_count, 0); } +void assert_no_error_logged() { ASSERT_EQ(log_state_instance.error_count, 0); } + +void assert_unknown_logged() { ASSERT_TRUE(log_state_instance.origin_unknown > 0); } +void assert_rpc_logged() { ASSERT_TRUE(log_state_instance.origin_rpc > 0); } +void assert_blockreader_logged() { ASSERT_TRUE(log_state_instance.origin_blockreader > 0); } +void assert_filehandle_logged() { ASSERT_TRUE(log_state_instance.origin_filehandle > 0); } +void assert_filesystem_logged() { ASSERT_TRUE(log_state_instance.origin_filesystem > 0); } + +void assert_no_unknown_logged() { ASSERT_EQ(log_state_instance.origin_unknown, 0); } +void assert_no_rpc_logged() { ASSERT_EQ(log_state_instance.origin_rpc, 0); } +void assert_no_blockreader_logged() { ASSERT_EQ(log_state_instance.origin_blockreader, 0); } +void assert_no_filehandle_logged() { ASSERT_EQ(log_state_instance.origin_filehandle, 0); } +void assert_no_filesystem_logged() { ASSERT_EQ(log_state_instance.origin_filesystem, 0); } + +void log_all_components_at_level(LogLevel lvl) { + if(lvl == kTrace) { + LOG_TRACE(kUnknown, << 'a'); + LOG_TRACE(kRPC, << 'b'); + LOG_TRACE(kBlockReader, << 'c'); + LOG_TRACE(kFileHandle, << 'd'); + LOG_TRACE(kFileSystem, << 'e'); + } else if (lvl == kDebug) { + LOG_DEBUG(kUnknown, << 'a'); + LOG_DEBUG(kRPC, << 'b'); + LOG_DEBUG(kBlockReader, << 'c'); + LOG_DEBUG(kFileHandle, << 'd'); + LOG_DEBUG(kFileSystem, << 'e'); + } else if (lvl == kInfo) { + LOG_INFO(kUnknown, << 'a'); + LOG_INFO(kRPC, << 'b'); + LOG_INFO(kBlockReader, << 'c'); + LOG_INFO(kFileHandle, << 'd'); + LOG_INFO(kFileSystem, << 'e'); + } else if (lvl == kWarning) { + LOG_WARN(kUnknown, << 'a'); + LOG_WARN(kRPC, << 'b'); + LOG_WARN(kBlockReader, << 'c'); + LOG_WARN(kFileHandle, << 'd'); + LOG_WARN(kFileSystem, << 'e'); + } else if (lvl == kError) { + LOG_ERROR(kUnknown, << 'a'); + LOG_ERROR(kRPC, << 'b'); + LOG_ERROR(kBlockReader, << 'c'); + LOG_ERROR(kFileHandle, << 'd'); + LOG_ERROR(kFileSystem, << 'e'); + } else { + // A level was added and not accounted for here + ASSERT_TRUE(false); + } +} + +// make sure everything can be masked +TEST(LoggingTest, MaskAll) { + LogManager::DisableLogForComponent(kUnknown); + LogManager::DisableLogForComponent(kRPC); + LogManager::DisableLogForComponent(kBlockReader); + LogManager::DisableLogForComponent(kFileHandle); + LogManager::DisableLogForComponent(kFileSystem); + + // use trace so anything that isn't masked should come through + LogManager::SetLogLevel(kTrace); + log_state_instance.reset(); + log_all_components_at_level(kError); + assert_nothing_logged(); + log_state_instance.reset(); +} + +// make sure components can be masked individually +TEST(LoggingTest, MaskOne) { + LogManager::DisableLogForComponent(kUnknown); + LogManager::DisableLogForComponent(kRPC); + LogManager::DisableLogForComponent(kBlockReader); + LogManager::DisableLogForComponent(kFileHandle); + LogManager::DisableLogForComponent(kFileSystem); + LogManager::SetLogLevel(kTrace); + + // Unknown - aka component not provided + LogManager::EnableLogForComponent(kUnknown); + log_all_components_at_level(kError); + assert_unknown_logged(); + assert_error_logged(); + assert_no_rpc_logged(); + assert_no_blockreader_logged(); + assert_no_filehandle_logged(); + assert_no_filesystem_logged(); + log_state_instance.reset(); + LogManager::DisableLogForComponent(kUnknown); + + // RPC + LogManager::EnableLogForComponent(kRPC); + log_all_components_at_level(kError); + assert_rpc_logged(); + assert_error_logged(); + assert_no_unknown_logged(); + assert_no_blockreader_logged(); + assert_no_filehandle_logged(); + assert_no_filesystem_logged(); + log_state_instance.reset(); + LogManager::DisableLogForComponent(kRPC); + + // BlockReader + LogManager::EnableLogForComponent(kBlockReader); + log_all_components_at_level(kError); + assert_blockreader_logged(); + assert_error_logged(); + assert_no_unknown_logged(); + assert_no_rpc_logged(); + assert_no_filehandle_logged(); + assert_no_filesystem_logged(); + log_state_instance.reset(); + LogManager::DisableLogForComponent(kBlockReader); + + // FileHandle + LogManager::EnableLogForComponent(kFileHandle); + log_all_components_at_level(kError); + assert_filehandle_logged(); + assert_error_logged(); + assert_no_unknown_logged(); + assert_no_rpc_logged(); + assert_no_blockreader_logged(); + assert_no_filesystem_logged(); + log_state_instance.reset(); + LogManager::DisableLogForComponent(kFileHandle); + + // FileSystem + LogManager::EnableLogForComponent(kFileSystem); + log_all_components_at_level(kError); + assert_filesystem_logged(); + assert_error_logged(); + assert_no_unknown_logged(); + assert_no_rpc_logged(); + assert_no_blockreader_logged(); + assert_no_filehandle_logged(); + log_state_instance.reset(); + LogManager::DisableLogForComponent(kFileSystem); +} + +TEST(LoggingTest, Levels) { + // should be safe to focus on one component if MaskOne passes + LogManager::EnableLogForComponent(kUnknown); + LogManager::SetLogLevel(kError); + + LOG_TRACE(kUnknown, << "a"); + LOG_DEBUG(kUnknown, << "b"); + LOG_INFO(kUnknown,<< "c"); + LOG_WARN(kUnknown, << "d"); + assert_nothing_logged(); + LOG_ERROR(kUnknown, << "e"); + assert_error_logged(); + assert_unknown_logged(); + log_state_instance.reset(); + + // anything >= warning + LogManager::SetLogLevel(kWarning); + LOG_TRACE(kUnknown, << "a"); + LOG_DEBUG(kUnknown, << "b"); + LOG_INFO(kUnknown, << "c"); + assert_nothing_logged(); + LOG_WARN(kUnknown, << "d"); + assert_warning_logged(); + LOG_ERROR(kUnknown, << "e"); + assert_error_logged(); + log_state_instance.reset(); + + // anything >= info + LogManager::SetLogLevel(kInfo); + LOG_TRACE(kUnknown, << "a"); + LOG_DEBUG(kUnknown, << "b"); + assert_nothing_logged(); + LOG_INFO(kUnknown, << "c"); + assert_info_logged(); + LOG_WARN(kUnknown, << "d"); + assert_warning_logged(); + LOG_ERROR(kUnknown, << "e"); + assert_error_logged(); + log_state_instance.reset(); + + // anything >= debug + LogManager::SetLogLevel(kDebug); + LOG_TRACE(kUnknown, << "a"); + assert_nothing_logged(); + LOG_DEBUG(kUnknown, << "b"); + assert_debug_logged(); + assert_no_info_logged(); + assert_no_warning_logged(); + assert_no_error_logged(); + LOG_INFO(kUnknown, << "c"); + assert_info_logged(); + assert_no_warning_logged(); + assert_no_error_logged(); + LOG_WARN(kUnknown, << "d"); + assert_warning_logged(); + assert_no_error_logged(); + LOG_ERROR(kUnknown, << "e"); + assert_error_logged(); + log_state_instance.reset(); + + // anything + LogManager::SetLogLevel(kTrace); + assert_nothing_logged(); + LOG_TRACE(kUnknown, << "a"); + assert_trace_logged(); + log_state_instance.reset(); + LOG_DEBUG(kUnknown, << "b"); + assert_debug_logged(); + log_state_instance.reset(); + LOG_INFO(kUnknown, << "c"); + assert_info_logged(); + log_state_instance.reset(); + LOG_WARN(kUnknown, << "d"); + assert_warning_logged(); + log_state_instance.reset(); + LOG_ERROR(kUnknown, << "e"); + assert_error_logged(); +} + +TEST(LoggingTest, Text) { + LogManager::EnableLogForComponent(kRPC); + + std::string text; + LOG_ERROR(kRPC, << text); + + ASSERT_EQ(text, log_state_instance.msg); +} + + +int main(int argc, char *argv[]) { + CForwardingLogger *logger = new CForwardingLogger(); + logger->SetCallback(process_log_msg); + LogManager::SetLoggerImplementation(std::unique_ptr<LoggerInterface>(logger)); + + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + int res = RUN_ALL_TESTS(); + google::protobuf::ShutdownProtobufLibrary(); + return res; +}
