Added HDFS Support For TextScanWorkOrder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f6c2f0b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f6c2f0b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f6c2f0b6 Branch: refs/heads/hdfs_text_scan Commit: f6c2f0b6c9d54328495180d688d18805c0a5850f Parents: aa7f6fe Author: Zuyu Zhang <zu...@apache.org> Authored: Mon Feb 6 14:42:42 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Wed Feb 8 01:03:47 2017 -0800 ---------------------------------------------------------------------- cli/distributed/Executor.cpp | 2 +- query_execution/CMakeLists.txt | 1 + query_execution/Shiftboss.cpp | 3 +- query_execution/Shiftboss.hpp | 14 ++++ relational_operators/CMakeLists.txt | 5 ++ relational_operators/TextScanOperator.cpp | 107 +++++++++++++++++++++---- relational_operators/TextScanOperator.hpp | 14 +++- relational_operators/WorkOrderFactory.cpp | 6 +- relational_operators/WorkOrderFactory.hpp | 4 +- storage/FileManagerHdfs.hpp | 9 +++ storage/StorageManager.cpp | 9 +++ storage/StorageManager.hpp | 8 +- 12 files changed, 156 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/cli/distributed/Executor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp index 1d03579..3485298 100644 --- a/cli/distributed/Executor.cpp +++ b/cli/distributed/Executor.cpp @@ -76,7 +76,7 @@ void Executor::init() { data_exchanger_.start(); shiftboss_ = - make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get()); + make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs()); shiftboss_->start(); for (const auto &worker : workers_) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index e26bde0..f251825 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -293,6 +293,7 @@ if (ENABLE_DISTRIBUTED) quickstep_queryexecution_WorkerMessage quickstep_relationaloperators_RebuildWorkOrder quickstep_relationaloperators_WorkOrderFactory + quickstep_storage_Flags quickstep_storage_InsertDestination quickstep_storage_StorageBlock quickstep_storage_StorageManager http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index 2ed42d0..bae5205 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -104,7 +104,8 @@ void Shiftboss::run() { query_contexts_[query_id].get(), storage_manager_, shiftboss_client_id_, - bus_); + bus_, + hdfs_); unique_ptr<WorkerMessage> worker_message( WorkerMessage::WorkOrderMessage(work_order, proto.operator_index())); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/query_execution/Shiftboss.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp index 6538d48..c48bd59 100644 --- a/query_execution/Shiftboss.hpp +++ b/query_execution/Shiftboss.hpp @@ -30,6 +30,8 @@ #include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/WorkerDirectory.hpp" +#include "storage/Flags.hpp" +#include "storage/StorageConfig.h" // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS. #include "threading/Thread.hpp" #include "utility/Macros.hpp" @@ -64,6 +66,7 @@ class Shiftboss : public Thread { * @param bus A pointer to the TMB. * @param storage_manager The StorageManager to use. * @param workers A pointer to the WorkerDirectory. + * @param hdfs The HDFS connector via libhdfs3. * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned. * * @note If cpu_id is not specified, Shiftboss thread can be possibly moved @@ -72,10 +75,12 @@ class Shiftboss : public Thread { Shiftboss(tmb::MessageBus *bus, StorageManager *storage_manager, WorkerDirectory *workers, + void *hdfs = nullptr, const int cpu_id = -1) : bus_(DCHECK_NOTNULL(bus)), storage_manager_(DCHECK_NOTNULL(storage_manager)), workers_(DCHECK_NOTNULL(workers)), + hdfs_(hdfs), cpu_id_(cpu_id), shiftboss_client_id_(tmb::kClientIdNone), foreman_client_id_(tmb::kClientIdNone), @@ -84,6 +89,12 @@ class Shiftboss : public Thread { // Check to have at least one Worker. DCHECK_GT(workers->getNumWorkers(), 0u); +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + if (FLAGS_use_hdfs) { + CHECK(hdfs_); + } +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + shiftboss_client_id_ = bus_->Connect(); LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_; DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone); @@ -228,6 +239,9 @@ class Shiftboss : public Thread { StorageManager *storage_manager_; WorkerDirectory *workers_; + // Not owned. + void *hdfs_; + // The ID of the CPU that the Shiftboss thread can optionally be pinned to. const int cpu_id_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index df4114d..737ab68 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -463,6 +463,7 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder quickstep_relationaloperators_WorkOrder_proto + quickstep_storage_Flags quickstep_storage_InsertDestination quickstep_types_Type quickstep_types_TypedValue @@ -472,6 +473,10 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator quickstep_utility_Glob quickstep_utility_Macros tmb) +if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS) + target_link_libraries(quickstep_relationaloperators_TextScanOperator + ${LIBHDFS3_LIBRARIES}) +endif(QUICKSTEP_HAVE_FILE_MANAGER_HDFS) target_link_libraries(quickstep_relationaloperators_UpdateOperator glog quickstep_catalog_CatalogRelation http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/relational_operators/TextScanOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp index 6650319..836579f 100644 --- a/relational_operators/TextScanOperator.cpp +++ b/relational_operators/TextScanOperator.cpp @@ -41,7 +41,14 @@ #include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" #include "relational_operators/WorkOrder.pb.h" +#include "storage/Flags.hpp" #include "storage/InsertDestination.hpp" +#include "storage/StorageConfig.h" // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS. + +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS +#include <hdfs/hdfs.h> +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + #include "types/Type.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" @@ -207,14 +214,53 @@ void TextScanWorkOrder::execute() { std::vector<TypedValue> vector_tuple_returned; constexpr std::size_t kSmallBufferSize = 0x4000; - char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize))); - - // Read text segment into buffer. - FILE *file = std::fopen(filename_.c_str(), "rb"); - std::fseek(file, text_offset_, SEEK_SET); - std::size_t bytes_read = std::fread(buffer, 1, text_segment_size_, file); - if (bytes_read != text_segment_size_) { - throw TextScanReadError(filename_); + const size_t buffer_size = std::max(text_segment_size_, kSmallBufferSize); + char *buffer = reinterpret_cast<char *>(malloc(buffer_size)); + + bool use_hdfs = false; + std::size_t bytes_read; + +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + hdfsFS hdfs = nullptr; + hdfsFile file_handle = nullptr; + + if (FLAGS_use_hdfs) { + use_hdfs = true; + hdfs = static_cast<hdfsFS>(hdfs_); + + file_handle = hdfsOpenFile(hdfs, filename_.c_str(), O_RDONLY, buffer_size, + 0 /* default replication */, 0 /* default block size */); + if (file_handle == nullptr) { + LOG(ERROR) << "Failed to open file " << filename_ << " with error: " << strerror(errno); + return; + } + + if (hdfsSeek(hdfs, file_handle, text_offset_)) { + LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno); + + hdfsCloseFile(hdfs, file_handle); + return; + } + + bytes_read = hdfsRead(hdfs, file_handle, buffer, text_segment_size_); + if (bytes_read != text_segment_size_) { + hdfsCloseFile(hdfs, file_handle); + throw TextScanReadError(filename_); + } + } +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + + FILE *file = nullptr; + if (!use_hdfs) { + // Read text segment into buffer. + file = std::fopen(filename_.c_str(), "rb"); + std::fseek(file, text_offset_, SEEK_SET); + bytes_read = std::fread(buffer, 1, text_segment_size_, file); + + if (bytes_read != text_segment_size_) { + std::fclose(file); + throw TextScanReadError(filename_); + } } // Locate the first newline character. @@ -255,11 +301,11 @@ void TextScanWorkOrder::execute() { } else { vector_tuple_returned = parseRow(&row_ptr, relation, &is_faulty); if (is_faulty) { - // Skip faulty rows - LOG(INFO) << "Faulty row found. Hence switching to next row."; + // Skip faulty rows + LOG(INFO) << "Faulty row found. Hence switching to next row."; } else { - // Convert vector returned to tuple only when a valid row is encountered. - tuples.emplace_back(Tuple(std::move(vector_tuple_returned))); + // Convert vector returned to tuple only when a valid row is encountered. + tuples.emplace_back(Tuple(std::move(vector_tuple_returned))); } } } @@ -268,10 +314,31 @@ void TextScanWorkOrder::execute() { // that the last tuple is very small / very large. std::size_t dynamic_read_size = 1024; std::string row_string; - std::fseek(file, text_offset_ + (end_ptr - buffer), SEEK_SET); + + const size_t dynamic_read_offset = text_offset_ + (end_ptr - buffer); + if (use_hdfs) { +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + if (hdfsSeek(hdfs, file_handle, dynamic_read_offset)) { + LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno); + + hdfsCloseFile(hdfs, file_handle); + return; + } +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + } else { + std::fseek(file, dynamic_read_offset, SEEK_SET); + } + bool has_reached_end = false; do { - bytes_read = std::fread(buffer, 1, dynamic_read_size, file); + if (use_hdfs) { +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + bytes_read = hdfsRead(hdfs, file_handle, buffer, dynamic_read_size); +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + } else { + bytes_read = std::fread(buffer, 1, dynamic_read_size, file); + } + std::size_t bytes_to_copy = bytes_read; for (std::size_t i = 0; i < bytes_read; ++i) { @@ -305,7 +372,14 @@ void TextScanWorkOrder::execute() { } } - std::fclose(file); + if (use_hdfs) { +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + hdfsCloseFile(hdfs, file_handle); +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + } else { + std::fclose(file); + } + free(buffer); // Store the tuples in a ColumnVectorsValueAccessor for bulk insert. @@ -336,7 +410,8 @@ void TextScanWorkOrder::execute() { } std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr, - const CatalogRelationSchema &relation, bool *is_faulty) const { + const CatalogRelationSchema &relation, + bool *is_faulty) const { std::vector<TypedValue> attribute_values; // Always assume current row is not faulty initially. *is_faulty = false; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/relational_operators/TextScanOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp index 65863b3..f6c4c2a 100644 --- a/relational_operators/TextScanOperator.hpp +++ b/relational_operators/TextScanOperator.hpp @@ -189,6 +189,7 @@ class TextScanWorkOrder : public WorkOrder { * @param process_escape_sequences Whether to decode escape sequences in the * text file. * @param output_destination The InsertDestination to insert tuples. + * @param hdfs The HDFS connector via libhdfs3. **/ TextScanWorkOrder( const std::size_t query_id, @@ -197,14 +198,16 @@ class TextScanWorkOrder : public WorkOrder { const std::size_t text_segment_size, const char field_terminator, const bool process_escape_sequences, - InsertDestination *output_destination) + InsertDestination *output_destination, + void *hdfs = nullptr) : WorkOrder(query_id), filename_(filename), text_offset_(text_offset), text_segment_size_(text_segment_size), field_terminator_(field_terminator), process_escape_sequences_(process_escape_sequences), - output_destination_(DCHECK_NOTNULL(output_destination)) {} + output_destination_(DCHECK_NOTNULL(output_destination)), + hdfs_(hdfs) {} ~TextScanWorkOrder() override {} @@ -265,8 +268,8 @@ class TextScanWorkOrder : public WorkOrder { * @return The tuple parsed from the char stream. */ std::vector<TypedValue> parseRow(const char **row_ptr, - const CatalogRelationSchema &relation, - bool *is_faulty) const; + const CatalogRelationSchema &relation, + bool *is_faulty) const; /** * @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as @@ -332,6 +335,9 @@ std::vector<TypedValue> parseRow(const char **row_ptr, InsertDestination *output_destination_; + // Not owned. + void *hdfs_; + DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index bd2a0f8..b2175c2 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -74,7 +74,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder QueryContext *query_context, StorageManager *storage_manager, const tmb::client_id shiftboss_client_id, - tmb::MessageBus *bus) { + tmb::MessageBus *bus, + void *hdfs) { DCHECK(query_context != nullptr); DCHECK(ProtoIsValid(proto, *catalog_database, *query_context)) << "Attempted to create WorkOrder from an invalid proto description:\n" @@ -459,7 +460,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::TextScanWorkOrder::field_terminator), proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences), query_context->getInsertDestination( - proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index))); + proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)), + hdfs); } case serialization::UPDATE: { LOG(INFO) << "Creating UpdateWorkOrder in Shiftboss " << shiftboss_index; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/relational_operators/WorkOrderFactory.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.hpp b/relational_operators/WorkOrderFactory.hpp index acf3855..ece687b 100644 --- a/relational_operators/WorkOrderFactory.hpp +++ b/relational_operators/WorkOrderFactory.hpp @@ -59,6 +59,7 @@ class WorkOrderFactory { * @param storage_manager The StorageManager to use. * @param shiftboss_client_id The TMB client id of Shiftboss. * @param bus A pointer to the TMB. + * @param hdfs The HDFS connector via libhdfs3. * * @return A new WorkOrder reconstructed from the supplied Protocol Buffer. **/ @@ -68,7 +69,8 @@ class WorkOrderFactory { QueryContext *query_context, StorageManager *storage_manager, const tmb::client_id shiftboss_client_id, - tmb::MessageBus *bus); + tmb::MessageBus *bus, + void *hdfs); /** * @brief Check whether a serialization::WorkOrder is fully-formed and http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/storage/FileManagerHdfs.hpp ---------------------------------------------------------------------- diff --git a/storage/FileManagerHdfs.hpp b/storage/FileManagerHdfs.hpp index f47e4a8..a8feb50 100644 --- a/storage/FileManagerHdfs.hpp +++ b/storage/FileManagerHdfs.hpp @@ -55,6 +55,15 @@ class FileManagerHdfs : public FileManager { block_id_counter getMaxUsedBlockCounter(const block_id_domain block_domain) const override; + /** + * @brief Get the HDFS connector via libhdfs3. + * + * @return The HDFS connector. + **/ + void* hdfs() { + return static_cast<void*>(hdfs_); + } + private: // libhdfs3 has an API to release this pointer. hdfsFS hdfs_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/storage/StorageManager.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp index 6f7d38b..872e8cc 100644 --- a/storage/StorageManager.cpp +++ b/storage/StorageManager.cpp @@ -570,6 +570,15 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block, return true; } +void* StorageManager::hdfs() { +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + if (FLAGS_use_hdfs) { + return static_cast<FileManagerHdfs*>(file_manager_.get())->hdfs(); + } +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + return nullptr; +} + vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) { serialization::BlockMessage proto; proto.set_block_id(block); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6c2f0b6/storage/StorageManager.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp index 42176ee..dc4b7e8 100644 --- a/storage/StorageManager.hpp +++ b/storage/StorageManager.hpp @@ -41,7 +41,6 @@ #include "storage/StorageBlob.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" -#include "storage/StorageConfig.h" #include "storage/StorageConstants.hpp" #include "threading/SpinSharedMutex.hpp" #include "utility/Macros.hpp" @@ -395,6 +394,13 @@ class StorageManager { void pullBlockOrBlob(const block_id block, PullResponse *response) const; #endif + /** + * @brief Get the HDFS connector via libhdfs3. + * + * @return The HDFS connector. + **/ + void* hdfs(); + private: struct BlockHandle { void *block_memory;