HDFS-11106: libhdfs++: Some refactoring to better organize files. Contributed by James Clampffer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ac01fc45 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ac01fc45 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ac01fc45 Branch: refs/heads/HDFS-12996 Commit: ac01fc45e84e2aa9badd7c42cbe02edf8e062b2c Parents: aee132a Author: James <j...@apache.org> Authored: Tue Nov 29 18:09:53 2016 -0500 Committer: Hanisha Koneru <hanishakon...@apache.org> Committed: Mon Mar 26 11:11:03 2018 -0700 ---------------------------------------------------------------------- .../native/libhdfspp/lib/common/CMakeLists.txt | 2 +- .../main/native/libhdfspp/lib/common/base64.cc | 73 --- .../libhdfspp/lib/common/hdfs_ioservice.cc | 51 ++ .../libhdfspp/lib/common/hdfs_ioservice.h | 46 ++ .../libhdfspp/lib/common/hdfs_public_api.cc | 47 -- .../libhdfspp/lib/common/hdfs_public_api.h | 39 -- .../main/native/libhdfspp/lib/common/util.cc | 51 ++ .../lib/connection/datanodeconnection.h | 2 +- .../main/native/libhdfspp/lib/fs/CMakeLists.txt | 2 +- .../main/native/libhdfspp/lib/fs/filehandle.h | 2 +- .../main/native/libhdfspp/lib/fs/filesystem.cc | 492 +--------------- .../native/libhdfspp/lib/fs/filesystem_sync.cc | 555 +++++++++++++++++++ .../main/native/libhdfspp/tests/CMakeLists.txt | 2 +- .../libhdfspp/tests/sasl_digest_md5_test.cc | 3 + 14 files changed, 712 insertions(+), 655 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt index 501d3b5..bdeb068 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -19,6 +19,6 @@ if(NEED_LINK_DL) set(LIB_DL dl) endif() -add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc) +add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc) add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>) target_link_libraries(common ${LIB_DL}) http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc deleted file mode 100644 index 76d10e6..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "util.h" - -#include <array> -#include <functional> -#include <algorithm> - -namespace hdfs { - -std::string Base64Encode(const std::string &src) { - //encoded size is (sizeof(buf) + 2) / 3 * 4 - static const std::string base64_chars = - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz" - "0123456789+/"; - std::string ret; - int i = 0; - int j = 0; - unsigned char char_array_3[3]; - unsigned char char_array_4[4]; - unsigned const char *bytes_to_encode = reinterpret_cast<unsigned const char *>(&src[i]); - unsigned int in_len = src.size(); - - while (in_len--) { - char_array_3[i++] = *(bytes_to_encode++); - if (i == 3) { - char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; - char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4); - char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6); - char_array_4[3] = char_array_3[2] & 0x3f; - - for(i = 0; (i <4) ; i++) - ret += base64_chars[char_array_4[i]]; - i = 0; - } - } - - if (i) { - for(j = i; j < 3; j++) - char_array_3[j] = '\0'; - - char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; - char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4); - char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6); - char_array_4[3] = char_array_3[2] & 0x3f; - - for (j = 0; (j < i + 1); j++) - ret += base64_chars[char_array_4[j]]; - - while((i++ < 3)) - ret += '='; - } - return ret; -} - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc new file mode 100644 index 0000000..f6876cd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfs_ioservice.h" + +#include "common/logging.h" + +namespace hdfs { + +IoService::~IoService() {} + +IoService *IoService::New() { return new IoServiceImpl(); } + +void IoServiceImpl::Run() { + // The IoService executes callbacks provided by library users in the context of worker threads, + // there is no way of preventing those callbacks from throwing but we can at least prevent them + // from escaping this library and crashing the process. + + // As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers + asio::io_service::work work(io_service_); + for(;;) + { + try + { + io_service_.run(); + break; + } catch (const std::exception & e) { + LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what()); + } catch (...) { + LOG_WARN(kFileSystem, << "Unexpected value not derived from std::exception in libhdfspp worker thread"); + } + } +} + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h new file mode 100644 index 0000000..73d167e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_HDFS_IOSERVICE_H_ +#define COMMON_HDFS_IOSERVICE_H_ + +#include "hdfspp/hdfspp.h" + +#include <asio/io_service.hpp> + +namespace hdfs { + +/* + * A thin wrapper over the asio::io_service. + * -In the future this could own the worker threads that execute io tasks which + * makes it easier to share IoServices between FileSystems. See HDFS-10796 for + * rationale. + */ + +class IoServiceImpl : public IoService { + public: + virtual void Run() override; + virtual void Stop() override { io_service_.stop(); } + ::asio::io_service &io_service() { return io_service_; } + private: + ::asio::io_service io_service_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc deleted file mode 100644 index 188071d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "hdfs_public_api.h" - -#include "common/logging.h" - -namespace hdfs { - -IoService::~IoService() {} - -IoService *IoService::New() { return new IoServiceImpl(); } - -void IoServiceImpl::Run() { - // As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers - asio::io_service::work work(io_service_); - for(;;) - { - try - { - io_service_.run(); - break; - } catch (const std::exception & e) { - LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what()); - } catch (...) { - LOG_WARN(kFileSystem, << "Unexpected value not derived from std::exception in libhdfspp worker thread"); - } - } -} - - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h deleted file mode 100644 index c8fea5e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef COMMON_HDFS_PUBLIC_API_H_ -#define COMMON_HDFS_PUBLIC_API_H_ - -#include "hdfspp/hdfspp.h" - -#include <asio/io_service.hpp> - -namespace hdfs { - -class IoServiceImpl : public IoService { - public: - virtual void Run() override; - virtual void Stop() override { io_service_.stop(); } - ::asio::io_service &io_service() { return io_service_; } - private: - ::asio::io_service io_service_; -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc index ede6acd..375f951 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc @@ -20,12 +20,14 @@ #include "common/util_c.h" #include <google/protobuf/io/zero_copy_stream_impl_lite.h> + #include <exception> #include <sstream> #include <iostream> #include <iomanip> #include <thread> + namespace hdfs { bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in, @@ -73,6 +75,53 @@ std::string GetRandomClientName() { return oss.str(); } +std::string Base64Encode(const std::string &src) { + //encoded size is (sizeof(buf) + 2) / 3 * 4 + static const std::string base64_chars = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + std::string ret; + int i = 0; + int j = 0; + unsigned char char_array_3[3]; + unsigned char char_array_4[4]; + unsigned const char *bytes_to_encode = reinterpret_cast<unsigned const char *>(&src[i]); + unsigned int in_len = src.size(); + + while (in_len--) { + char_array_3[i++] = *(bytes_to_encode++); + if (i == 3) { + char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; + char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4); + char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6); + char_array_4[3] = char_array_3[2] & 0x3f; + + for(i = 0; (i <4) ; i++) + ret += base64_chars[char_array_4[i]]; + i = 0; + } + } + + if (i) { + for(j = i; j < 3; j++) + char_array_3[j] = '\0'; + + char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; + char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4); + char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6); + char_array_4[3] = char_array_3[2] & 0x3f; + + for (j = 0; (j < i + 1); j++) + ret += base64_chars[char_array_4[j]]; + + while((i++ < 3)) + ret += '='; + } + return ret; +} + + std::string SafeDisconnect(asio::ip::tcp::socket *sock) { std::string err; if(sock && sock->is_open()) { @@ -117,3 +166,5 @@ bool IsHighBitSet(uint64_t num) { void ShutdownProtobufLibrary_C() { google::protobuf::ShutdownProtobufLibrary(); } + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h index 0ff6fc4..21193b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h @@ -18,7 +18,7 @@ #ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ #define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ -#include "common/hdfs_public_api.h" +#include "common/hdfs_ioservice.h" #include "common/async_stream.h" #include "ClientNamenodeProtocol.pb.h" #include "common/libhdfs_events_impl.h" http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt index 0bce70d..624cda5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt @@ -16,6 +16,6 @@ # limitations under the License. # -add_library(fs_obj OBJECT filesystem.cc filehandle.cc bad_datanode_tracker.cc namenode_operations.cc) +add_library(fs_obj OBJECT filesystem.cc filesystem_sync.cc filehandle.cc bad_datanode_tracker.cc namenode_operations.cc) add_dependencies(fs_obj proto) add_library(fs $<TARGET_OBJECTS:fs_obj>) http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h index 7e7c79d..14f7ea8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -18,7 +18,7 @@ #ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_ #define LIBHDFSPP_LIB_FS_FILEHANDLE_H_ -#include "common/hdfs_public_api.h" +#include "common/hdfs_ioservice.h" #include "common/async_stream.h" #include "common/cancel_tracker.h" #include "common/libhdfs_events_impl.h" http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index 5d5b9f2..d805716 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -32,8 +32,7 @@ namespace hdfs { -static const char kNamenodeProtocol[] = - "org.apache.hadoop.hdfs.protocol.ClientProtocol"; +static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; static const int kNamenodeProtocolVersion = 1; using ::asio::ip::tcp; @@ -203,26 +202,6 @@ void FileSystemImpl::Connect(const std::string &server, }); } -Status FileSystemImpl::Connect(const std::string &server, const std::string &service) { - LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR - << ", server=" << server << ", service=" << service << ") called"); - - /* synchronized */ - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future = stat->get_future(); - - auto callback = [stat](const Status &s, FileSystem *fs) { - (void)fs; - stat->set_value(s); - }; - - Connect(server, service, callback); - - /* block until promise is set */ - auto s = future.get(); - - return s; -} void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &, FileSystem *)> &handler) { std::string scheme = options_.defaultFS.get_scheme(); @@ -248,25 +227,6 @@ void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &, Connect(host, port_as_string, handler); } -Status FileSystemImpl::ConnectToDefaultFs() { - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future = stat->get_future(); - - auto callback = [stat](const Status &s, FileSystem *fs) { - (void)fs; - stat->set_value(s); - }; - - ConnectToDefaultFs(callback); - - /* block until promise is set */ - auto s = future.get(); - - return s; -} - - - int FileSystemImpl::AddWorkerThread() { LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread(" << FMT_THIS_ADDR << ") called." @@ -297,38 +257,6 @@ void FileSystemImpl::Open( }); } -Status FileSystemImpl::Open(const std::string &path, - FileHandle **handle) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Open(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>(); - std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future()); - - /* wrap async FileSystem::Open with promise to make it a blocking call */ - auto h = [callstate](const Status &s, FileHandle *is) { - callstate->set_value(std::make_tuple(s, is)); - }; - - Open(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - FileHandle *file_handle = std::get<1>(returnstate); - - if (!stat.ok()) { - delete file_handle; - return stat; - } - if (!file_handle) { - return stat; - } - - *handle = file_handle; - return stat; -} BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock) { @@ -411,39 +339,6 @@ void FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset nn_.GetBlockLocations(path, offset, length, conversion); } -Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, - std::shared_ptr<FileBlockLocation> * fileBlockLocations) -{ - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - if (!fileBlockLocations) - return Status::InvalidArgument("Null pointer passed to GetBlockLocations"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, std::shared_ptr<FileBlockLocation>>>>(); - std::future<std::tuple<Status, std::shared_ptr<FileBlockLocation>>> future(callstate->get_future()); - - /* wrap async call with promise/future to make it blocking */ - auto callback = [callstate](const Status &s, std::shared_ptr<FileBlockLocation> blockInfo) { - callstate->set_value(std::make_tuple(s,blockInfo)); - }; - - GetBlockLocations(path, offset, length, callback); - - /* wait for async to finish */ - auto returnstate = future.get(); - auto stat = std::get<0>(returnstate); - - if (!stat.ok()) { - return stat; - } - - *fileBlockLocations = std::get<1>(returnstate); - - return stat; -} - void FileSystemImpl::GetPreferredBlockSize(const std::string &path, const std::function<void(const Status &, const uint64_t &)> &handler) { LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetPreferredBlockSize(" @@ -453,33 +348,6 @@ void FileSystemImpl::GetPreferredBlockSize(const std::string &path, nn_.GetPreferredBlockSize(path, handler); } -Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t & block_size) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, uint64_t>>>(); - std::future<std::tuple<Status, uint64_t>> future(callstate->get_future()); - - /* wrap async FileSystem::GetPreferredBlockSize with promise to make it a blocking call */ - auto h = [callstate](const Status &s, const uint64_t & bsize) { - callstate->set_value(std::make_tuple(s, bsize)); - }; - - GetPreferredBlockSize(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - uint64_t size = std::get<1>(returnstate); - - if (!stat.ok()) { - return stat; - } - - block_size = size; - return stat; -} void FileSystemImpl::SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) { LOG_DEBUG(kFileSystem, @@ -499,27 +367,6 @@ void FileSystemImpl::SetReplication(const std::string & path, int16_t replicatio nn_.SetReplication(path, replication, handler); } -Status FileSystemImpl::SetReplication(const std::string & path, int16_t replication) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" << path << - ", replication=" << replication << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::SetReplication with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - SetReplication(path, replication, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, std::function<void(const Status &)> handler) { @@ -535,27 +382,6 @@ void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t nn_.SetTimes(path, mtime, atime, handler); } -Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << path << - ", mtime=" << mtime << ", atime=" << atime << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::SetTimes with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - SetTimes(path, mtime, atime, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} void FileSystemImpl::GetFileInfo( const std::string &path, @@ -567,34 +393,6 @@ void FileSystemImpl::GetFileInfo( nn_.GetFileInfo(path, handler); } -Status FileSystemImpl::GetFileInfo(const std::string &path, - StatInfo & stat_info) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>(); - std::future<std::tuple<Status, StatInfo>> future(callstate->get_future()); - - /* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */ - auto h = [callstate](const Status &s, const StatInfo &si) { - callstate->set_value(std::make_tuple(s, si)); - }; - - GetFileInfo(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - StatInfo info = std::get<1>(returnstate); - - if (!stat.ok()) { - return stat; - } - - stat_info = info; - return stat; -} void FileSystemImpl::GetFsStats( const std::function<void(const Status &, const FsInfo &)> &handler) { @@ -604,32 +402,6 @@ void FileSystemImpl::GetFsStats( nn_.GetFsStats(handler); } -Status FileSystemImpl::GetFsStats(FsInfo & fs_info) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]GetFsStats(" << FMT_THIS_ADDR << ") called"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, FsInfo>>>(); - std::future<std::tuple<Status, FsInfo>> future(callstate->get_future()); - - /* wrap async FileSystem::GetFsStats with promise to make it a blocking call */ - auto h = [callstate](const Status &s, const FsInfo &si) { - callstate->set_value(std::make_tuple(s, si)); - }; - - GetFsStats(h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - FsInfo info = std::get<1>(returnstate); - - if (!stat.ok()) { - return stat; - } - - fs_info = info; - return stat; -} /** * Helper function for recursive GetListing calls. @@ -666,43 +438,6 @@ void FileSystemImpl::GetListing( nn_.GetListing(path, callback); } -Status FileSystemImpl::GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetListing(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - if (!stat_infos) { - return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL"); - } - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::GetListing with promise to make it a blocking call. - * - Keep requesting more until we get the entire listing, and don't set the promise - * until we have the entire listing. - */ - auto h = [callstate, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more) -> bool { - if (!si.empty()) { - stat_infos->insert(stat_infos->end(), si.begin(), si.end()); - } - - bool done = !s.ok() || !has_more; - if (done) { - callstate->set_value(s); - return false; - } - return true; - }; - - GetListing(path, h); - - /* block until promise is set */ - Status stat = future.get(); - - return stat; -} void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent, std::function<void(const Status &)> handler) { @@ -724,27 +459,6 @@ void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool nn_.Mkdirs(path, permissions, createparent, handler); } -Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << - ", permissions=" << permissions << ", createparent=" << createparent << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::Mkdirs with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - Mkdirs(path, permissions, createparent, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} void FileSystemImpl::Delete(const std::string &path, bool recursive, const std::function<void(const Status &)> &handler) { @@ -759,26 +473,6 @@ void FileSystemImpl::Delete(const std::string &path, bool recursive, nn_.Delete(path, recursive, handler); } -Status FileSystemImpl::Delete(const std::string &path, bool recursive) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::Delete with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - Delete(path, recursive, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath, const std::function<void(const Status &)> &handler) { @@ -798,26 +492,6 @@ void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPa nn_.Rename(oldPath, newPath, handler); } -Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::Rename with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - Rename(oldPath, newPath, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} void FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions, const std::function<void(const Status &)> &handler) { @@ -837,25 +511,6 @@ void FileSystemImpl::SetPermission(const std::string & path, nn_.SetPermission(path, permissions, handler); } -Status FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::SetPermission with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - SetPermission(path, permissions, h); - - /* block until promise is set */ - Status stat = future.get(); - - return stat; -} void FileSystemImpl::SetOwner(const std::string & path, const std::string & username, const std::string & groupname, const std::function<void(const Status &)> &handler) { @@ -870,25 +525,6 @@ void FileSystemImpl::SetOwner(const std::string & path, const std::string & user nn_.SetOwner(path, username, groupname, handler); } -Status FileSystemImpl::SetOwner(const std::string & path, const std::string & username, - const std::string & groupname) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::SetOwner with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - SetOwner(path, username, groupname, h); - - /* block until promise is set */ - Status stat = future.get(); - return stat; -} /** * Helper function for recursive Find calls. @@ -1016,50 +652,6 @@ void FileSystemImpl::Find( nn_.GetListing("/", callback); } -Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Find(" - << FMT_THIS_ADDR << ", path=" - << path << ", name=" - << name << ") called"); - - if (!stat_infos) { - return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL"); - } - - // In this case, we're going to have the async code populate stat_infos. - - std::promise<void> promise = std::promise<void>(); - std::future<void> future(promise.get_future()); - Status status = Status::OK(); - - /** - * Keep requesting more until we get the entire listing. Set the promise - * when we have the entire listing to stop. - * - * Find guarantees that the handler will only be called once at a time, - * so we do not need any locking here - */ - auto h = [&status, &promise, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more_results) -> bool { - if (!si.empty()) { - stat_infos->insert(stat_infos->end(), si.begin(), si.end()); - } - if (!s.ok() && status.ok()){ - //We make sure we set 'status' only on the first error. - status = s; - } - if (!has_more_results) { - promise.set_value(); - return false; - } - return true; - }; - - Find(path, name, maxdepth, h); - - /* block until promise is set */ - future.get(); - return status; -} void FileSystemImpl::CreateSnapshot(const std::string &path, const std::string &name, @@ -1075,27 +667,6 @@ void FileSystemImpl::CreateSnapshot(const std::string &path, nn_.CreateSnapshot(path, name, handler); } -Status FileSystemImpl::CreateSnapshot(const std::string &path, - const std::string &name) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::CreateSnapshot with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - CreateSnapshot(path, name, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} void FileSystemImpl::DeleteSnapshot(const std::string &path, const std::string &name, @@ -1115,27 +686,6 @@ void FileSystemImpl::DeleteSnapshot(const std::string &path, nn_.DeleteSnapshot(path, name, handler); } -Status FileSystemImpl::DeleteSnapshot(const std::string &path, - const std::string &name) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - DeleteSnapshot(path, name, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} void FileSystemImpl::AllowSnapshot(const std::string &path, const std::function<void(const Status &)> &handler) { @@ -1150,26 +700,6 @@ void FileSystemImpl::AllowSnapshot(const std::string &path, nn_.AllowSnapshot(path, handler); } -Status FileSystemImpl::AllowSnapshot(const std::string &path) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::AllowSnapshot with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - AllowSnapshot(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} void FileSystemImpl::DisallowSnapshot(const std::string &path, const std::function<void(const Status &)> &handler) { @@ -1184,26 +714,6 @@ void FileSystemImpl::DisallowSnapshot(const std::string &path, nn_.DisallowSnapshot(path, handler); } -Status FileSystemImpl::DisallowSnapshot(const std::string &path) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::DisallowSnapshot with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - DisallowSnapshot(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) { // It is far too easy to destroy the filesystem (and thus the threadpool) http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc new file mode 100644 index 0000000..73be538 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc @@ -0,0 +1,555 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" + +#include <future> +#include <tuple> + +#define FMT_THIS_ADDR "this=" << (void*)this + +// Note: This is just a place to hold boilerplate async to sync shim code, +// place actual filesystem logic in filesystem.cc +// +// +// Shim pattern pseudocode +// +// Status MySynchronizedMethod(method_args): +// let stat = a promise<Status> wrapped in a shared_ptr +// +// Create a lambda that captures stat and any other variables that need to +// be set based on the async operation. When invoked set variables with the +// arguments passed (possibly do some translation), then set stat to indicate +// the return status of the async call. +// +// invoke MyAsyncMethod(method_args, handler_lambda) +// +// block until stat value has been set while async work takes place +// +// return stat + +namespace hdfs { + +Status FileSystemImpl::Connect(const std::string &server, const std::string &service) { + LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR + << ", server=" << server << ", service=" << service << ") called"); + + /* synchronized */ + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future = stat->get_future(); + + auto callback = [stat](const Status &s, FileSystem *fs) { + (void)fs; + stat->set_value(s); + }; + + Connect(server, service, callback); + + /* block until promise is set */ + auto s = future.get(); + + return s; +} + + +Status FileSystemImpl::ConnectToDefaultFs() { + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future = stat->get_future(); + + auto callback = [stat](const Status &s, FileSystem *fs) { + (void)fs; + stat->set_value(s); + }; + + ConnectToDefaultFs(callback); + + /* block until promise is set */ + auto s = future.get(); + + return s; +} + + +Status FileSystemImpl::Open(const std::string &path, + FileHandle **handle) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Open(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>(); + std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future()); + + /* wrap async FileSystem::Open with promise to make it a blocking call */ + auto h = [callstate](const Status &s, FileHandle *is) { + callstate->set_value(std::make_tuple(s, is)); + }; + + Open(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + FileHandle *file_handle = std::get<1>(returnstate); + + if (!stat.ok()) { + delete file_handle; + return stat; + } + if (!file_handle) { + return stat; + } + + *handle = file_handle; + return stat; +} + +Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, + std::shared_ptr<FileBlockLocation> * fileBlockLocations) +{ + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + if (!fileBlockLocations) + return Status::InvalidArgument("Null pointer passed to GetBlockLocations"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, std::shared_ptr<FileBlockLocation>>>>(); + std::future<std::tuple<Status, std::shared_ptr<FileBlockLocation>>> future(callstate->get_future()); + + /* wrap async call with promise/future to make it blocking */ + auto callback = [callstate](const Status &s, std::shared_ptr<FileBlockLocation> blockInfo) { + callstate->set_value(std::make_tuple(s,blockInfo)); + }; + + GetBlockLocations(path, offset, length, callback); + + /* wait for async to finish */ + auto returnstate = future.get(); + auto stat = std::get<0>(returnstate); + + if (!stat.ok()) { + return stat; + } + + *fileBlockLocations = std::get<1>(returnstate); + + return stat; +} + +Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t & block_size) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, uint64_t>>>(); + std::future<std::tuple<Status, uint64_t>> future(callstate->get_future()); + + /* wrap async FileSystem::GetPreferredBlockSize with promise to make it a blocking call */ + auto h = [callstate](const Status &s, const uint64_t & bsize) { + callstate->set_value(std::make_tuple(s, bsize)); + }; + + GetPreferredBlockSize(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + uint64_t size = std::get<1>(returnstate); + + if (!stat.ok()) { + return stat; + } + + block_size = size; + return stat; +} + +Status FileSystemImpl::SetReplication(const std::string & path, int16_t replication) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" << path << + ", replication=" << replication << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::SetReplication with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + SetReplication(path, replication, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << path << + ", mtime=" << mtime << ", atime=" << atime << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::SetTimes with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + SetTimes(path, mtime, atime, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::GetFileInfo(const std::string &path, + StatInfo & stat_info) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>(); + std::future<std::tuple<Status, StatInfo>> future(callstate->get_future()); + + /* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */ + auto h = [callstate](const Status &s, const StatInfo &si) { + callstate->set_value(std::make_tuple(s, si)); + }; + + GetFileInfo(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + StatInfo info = std::get<1>(returnstate); + + if (!stat.ok()) { + return stat; + } + + stat_info = info; + return stat; +} + +Status FileSystemImpl::GetFsStats(FsInfo & fs_info) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]GetFsStats(" << FMT_THIS_ADDR << ") called"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, FsInfo>>>(); + std::future<std::tuple<Status, FsInfo>> future(callstate->get_future()); + + /* wrap async FileSystem::GetFsStats with promise to make it a blocking call */ + auto h = [callstate](const Status &s, const FsInfo &si) { + callstate->set_value(std::make_tuple(s, si)); + }; + + GetFsStats(h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + FsInfo info = std::get<1>(returnstate); + + if (!stat.ok()) { + return stat; + } + + fs_info = info; + return stat; +} + +Status FileSystemImpl::GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetListing(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + if (!stat_infos) { + return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL"); + } + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::GetListing with promise to make it a blocking call. + * + Keep requesting more until we get the entire listing, and don't set the promise + * until we have the entire listing. + */ + auto h = [callstate, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more) -> bool { + if (!si.empty()) { + stat_infos->insert(stat_infos->end(), si.begin(), si.end()); + } + + bool done = !s.ok() || !has_more; + if (done) { + callstate->set_value(s); + return false; + } + return true; + }; + + GetListing(path, h); + + /* block until promise is set */ + Status stat = future.get(); + + return stat; +} + +Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << + ", permissions=" << permissions << ", createparent=" << createparent << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::Mkdirs with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + Mkdirs(path, permissions, createparent, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::Delete(const std::string &path, bool recursive) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::Delete with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + Delete(path, recursive, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::Rename with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + Rename(oldPath, newPath, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::SetPermission with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + SetPermission(path, permissions, h); + + /* block until promise is set */ + Status stat = future.get(); + + return stat; +} + +Status FileSystemImpl::SetOwner(const std::string & path, const std::string & username, + const std::string & groupname) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::SetOwner with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + SetOwner(path, username, groupname, h); + + /* block until promise is set */ + Status stat = future.get(); + return stat; +} + +Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Find(" + << FMT_THIS_ADDR << ", path=" + << path << ", name=" + << name << ") called"); + + if (!stat_infos) { + return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL"); + } + + // In this case, we're going to have the async code populate stat_infos. + + std::promise<void> promise = std::promise<void>(); + std::future<void> future(promise.get_future()); + Status status = Status::OK(); + + /** + * Keep requesting more until we get the entire listing. Set the promise + * when we have the entire listing to stop. + * + * Find guarantees that the handler will only be called once at a time, + * so we do not need any locking here + */ + auto h = [&status, &promise, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more_results) -> bool { + if (!si.empty()) { + stat_infos->insert(stat_infos->end(), si.begin(), si.end()); + } + if (!s.ok() && status.ok()){ + //We make sure we set 'status' only on the first error. + status = s; + } + if (!has_more_results) { + promise.set_value(); + return false; + } + return true; + }; + + Find(path, name, maxdepth, h); + + /* block until promise is set */ + future.get(); + return status; +} + +Status FileSystemImpl::CreateSnapshot(const std::string &path, + const std::string &name) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::CreateSnapshot with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + CreateSnapshot(path, name, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::DeleteSnapshot(const std::string &path, + const std::string &name) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + DeleteSnapshot(path, name, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::AllowSnapshot(const std::string &path) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::AllowSnapshot with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + AllowSnapshot(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::DisallowSnapshot(const std::string &path) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::DisallowSnapshot with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + DisallowSnapshot(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index 032dfc8..3dd4aae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -72,7 +72,7 @@ target_link_libraries(remote_block_reader_test test_common reader proto common c add_memcheck_test(remote_block_reader remote_block_reader_test) add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc) -target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +target_link_libraries(sasl_digest_md5_test common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_memcheck_test(sasl_digest_md5 sasl_digest_md5_test) add_executable(retry_policy_test retry_policy_test.cc) http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac01fc45/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc index 0797853..553ffa4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc @@ -18,6 +18,7 @@ #include "common/sasl_authenticator.h" #include <gtest/gtest.h> +#include <google/protobuf/stubs/common.h> namespace hdfs { @@ -40,5 +41,7 @@ TEST(DigestMD5AuthenticatorTest, TestResponse) { ASSERT_TRUE(status.ok()); ASSERT_TRUE(result.find("response=3a286c2c385b92a06ebc66d58b8c4330") != std::string::npos); + + google::protobuf::ShutdownProtobufLibrary(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org