http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc deleted file mode 100644 index 53c9e26..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc +++ /dev/null @@ -1,607 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "filesystem.h" - -#include <future> -#include <tuple> - -#define FMT_THIS_ADDR "this=" << (void*)this - -// Note: This is just a place to hold boilerplate async to sync shim code, -// place actual filesystem logic in filesystem.cc -// -// -// Shim pattern pseudocode -// -// Status MySynchronizedMethod(method_args): -// let stat = a promise<Status> wrapped in a shared_ptr -// -// Create a lambda that captures stat and any other variables that need to -// be set based on the async operation. When invoked set variables with the -// arguments passed (possibly do some translation), then set stat to indicate -// the return status of the async call. -// -// invoke MyAsyncMethod(method_args, handler_lambda) -// -// block until stat value has been set while async work takes place -// -// return stat - -namespace hdfs { - -Status FileSystemImpl::Connect(const std::string &server, const std::string &service) { - LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR - << ", server=" << server << ", service=" << service << ") called"); - - /* synchronized */ - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future = stat->get_future(); - - auto callback = [stat](const Status &s, FileSystem *fs) { - (void)fs; - stat->set_value(s); - }; - - Connect(server, service, callback); - - /* block until promise is set */ - auto s = future.get(); - - return s; -} - - -Status FileSystemImpl::ConnectToDefaultFs() { - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future = stat->get_future(); - - auto callback = [stat](const Status &s, FileSystem *fs) { - (void)fs; - stat->set_value(s); - }; - - ConnectToDefaultFs(callback); - - /* block until promise is set */ - auto s = future.get(); - - return s; -} - - -Status FileSystemImpl::Open(const std::string &path, - FileHandle **handle) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Open(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>(); - std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future()); - - /* wrap async FileSystem::Open with promise to make it a blocking call */ - auto h = [callstate](const Status &s, FileHandle *is) { - callstate->set_value(std::make_tuple(s, is)); - }; - - Open(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - FileHandle *file_handle = std::get<1>(returnstate); - - if (!stat.ok()) { - delete file_handle; - return stat; - } - if (!file_handle) { - return stat; - } - - *handle = file_handle; - return stat; -} - -Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, - std::shared_ptr<FileBlockLocation> * fileBlockLocations) -{ - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - if (!fileBlockLocations) - return Status::InvalidArgument("Null pointer passed to GetBlockLocations"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, std::shared_ptr<FileBlockLocation>>>>(); - std::future<std::tuple<Status, std::shared_ptr<FileBlockLocation>>> future(callstate->get_future()); - - /* wrap async call with promise/future to make it blocking */ - auto callback = [callstate](const Status &s, std::shared_ptr<FileBlockLocation> blockInfo) { - callstate->set_value(std::make_tuple(s,blockInfo)); - }; - - GetBlockLocations(path, offset, length, callback); - - /* wait for async to finish */ - auto returnstate = future.get(); - auto stat = std::get<0>(returnstate); - - if (!stat.ok()) { - return stat; - } - - *fileBlockLocations = std::get<1>(returnstate); - - return stat; -} - -Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t & block_size) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, uint64_t>>>(); - std::future<std::tuple<Status, uint64_t>> future(callstate->get_future()); - - /* wrap async FileSystem::GetPreferredBlockSize with promise to make it a blocking call */ - auto h = [callstate](const Status &s, const uint64_t & bsize) { - callstate->set_value(std::make_tuple(s, bsize)); - }; - - GetPreferredBlockSize(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - uint64_t size = std::get<1>(returnstate); - - if (!stat.ok()) { - return stat; - } - - block_size = size; - return stat; -} - -Status FileSystemImpl::SetReplication(const std::string & path, int16_t replication) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" << path << - ", replication=" << replication << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::SetReplication with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - SetReplication(path, replication, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} - -Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << path << - ", mtime=" << mtime << ", atime=" << atime << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::SetTimes with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - SetTimes(path, mtime, atime, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} - -Status FileSystemImpl::GetFileInfo(const std::string &path, - StatInfo & stat_info) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>(); - std::future<std::tuple<Status, StatInfo>> future(callstate->get_future()); - - /* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */ - auto h = [callstate](const Status &s, const StatInfo &si) { - callstate->set_value(std::make_tuple(s, si)); - }; - - GetFileInfo(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - StatInfo info = std::get<1>(returnstate); - - if (!stat.ok()) { - return stat; - } - - stat_info = info; - return stat; -} - -Status FileSystemImpl::GetContentSummary(const std::string &path, - ContentSummary & content_summary) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetContentSummary(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, ContentSummary>>>(); - std::future<std::tuple<Status, ContentSummary>> future(callstate->get_future()); - - /* wrap async FileSystem::GetContentSummary with promise to make it a blocking call */ - auto h = [callstate](const Status &s, const ContentSummary &si) { - callstate->set_value(std::make_tuple(s, si)); - }; - - GetContentSummary(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - ContentSummary cs = std::get<1>(returnstate); - - if (!stat.ok()) { - return stat; - } - - content_summary = cs; - return stat; -} - -Status FileSystemImpl::GetFsStats(FsInfo & fs_info) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]GetFsStats(" << FMT_THIS_ADDR << ") called"); - - auto callstate = std::make_shared<std::promise<std::tuple<Status, FsInfo>>>(); - std::future<std::tuple<Status, FsInfo>> future(callstate->get_future()); - - /* wrap async FileSystem::GetFsStats with promise to make it a blocking call */ - auto h = [callstate](const Status &s, const FsInfo &si) { - callstate->set_value(std::make_tuple(s, si)); - }; - - GetFsStats(h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - FsInfo info = std::get<1>(returnstate); - - if (!stat.ok()) { - return stat; - } - - fs_info = info; - return stat; -} - -Status FileSystemImpl::GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetListing(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - if (!stat_infos) { - return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL"); - } - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::GetListing with promise to make it a blocking call. - * - Keep requesting more until we get the entire listing, and don't set the promise - * until we have the entire listing. - */ - auto h = [callstate, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more) -> bool { - if (!si.empty()) { - stat_infos->insert(stat_infos->end(), si.begin(), si.end()); - } - - bool done = !s.ok() || !has_more; - if (done) { - callstate->set_value(s); - return false; - } - return true; - }; - - GetListing(path, h); - - /* block until promise is set */ - Status stat = future.get(); - - return stat; -} - -Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << - ", permissions=" << permissions << ", createparent=" << createparent << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::Mkdirs with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - Mkdirs(path, permissions, createparent, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} - -Status FileSystemImpl::Delete(const std::string &path, bool recursive) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::Delete with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - Delete(path, recursive, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} - -Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::Rename with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - Rename(oldPath, newPath, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} - -Status FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::SetPermission with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - SetPermission(path, permissions, h); - - /* block until promise is set */ - Status stat = future.get(); - - return stat; -} - -Status FileSystemImpl::SetOwner(const std::string & path, const std::string & username, - const std::string & groupname) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::SetOwner with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - SetOwner(path, username, groupname, h); - - /* block until promise is set */ - Status stat = future.get(); - return stat; -} - -Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Find(" - << FMT_THIS_ADDR << ", path=" - << path << ", name=" - << name << ") called"); - - if (!stat_infos) { - return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL"); - } - - // In this case, we're going to have the async code populate stat_infos. - - std::promise<void> promise = std::promise<void>(); - std::future<void> future(promise.get_future()); - Status status = Status::OK(); - - /** - * Keep requesting more until we get the entire listing. Set the promise - * when we have the entire listing to stop. - * - * Find guarantees that the handler will only be called once at a time, - * so we do not need any locking here - */ - auto h = [&status, &promise, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more_results) -> bool { - if (!si.empty()) { - stat_infos->insert(stat_infos->end(), si.begin(), si.end()); - } - if (!s.ok() && status.ok()){ - //We make sure we set 'status' only on the first error. - status = s; - } - if (!has_more_results) { - promise.set_value(); - return false; - } - return true; - }; - - Find(path, name, maxdepth, h); - - /* block until promise is set */ - future.get(); - return status; -} - -Status FileSystemImpl::CreateSnapshot(const std::string &path, - const std::string &name) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::CreateSnapshot with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - CreateSnapshot(path, name, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} - -Status FileSystemImpl::DeleteSnapshot(const std::string &path, - const std::string &name) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - DeleteSnapshot(path, name, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} - -Status FileSystemImpl::RenameSnapshot(const std::string &path, - const std::string &old_name, const std::string &new_name) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]RenameSnapshot(" << FMT_THIS_ADDR << ", path=" << path << - ", old_name=" << old_name << ", new_name=" << new_name << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::RenameSnapshot with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - RenameSnapshot(path, old_name, new_name, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} - -Status FileSystemImpl::AllowSnapshot(const std::string &path) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::AllowSnapshot with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - AllowSnapshot(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} - -Status FileSystemImpl::DisallowSnapshot(const std::string &path) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - - auto callstate = std::make_shared<std::promise<Status>>(); - std::future<Status> future(callstate->get_future()); - - /* wrap async FileSystem::DisallowSnapshot with promise to make it a blocking call */ - auto h = [callstate](const Status &s) { - callstate->set_value(s); - }; - - DisallowSnapshot(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = returnstate; - - return stat; -} - -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc deleted file mode 100644 index e46faad..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc +++ /dev/null @@ -1,727 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "filesystem.h" -#include "common/continuation/asio.h" - -#include <asio/ip/tcp.hpp> - -#include <functional> -#include <limits> -#include <future> -#include <tuple> -#include <iostream> -#include <pwd.h> -#include <utility> - -#define FMT_THIS_ADDR "this=" << (void*)this - -using ::asio::ip::tcp; - -namespace hdfs { - -/***************************************************************************** - * NAMENODE OPERATIONS - ****************************************************************************/ - -void NameNodeOperations::Connect(const std::string &cluster_name, - const std::vector<ResolvedNamenodeInfo> &servers, - std::function<void(const Status &)> &&handler) { - engine_->Connect(cluster_name, servers, handler); -} - -bool NameNodeOperations::CancelPendingConnect() { - return engine_->CancelPendingConnect(); -} - -void NameNodeOperations::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, - std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler) -{ - using ::hadoop::hdfs::GetBlockLocationsRequestProto; - using ::hadoop::hdfs::GetBlockLocationsResponseProto; - - LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations(" - << FMT_THIS_ADDR << ", path=" << path << ", ...) called"); - - if (path.empty()) { - handler(Status::InvalidArgument("GetBlockLocations: argument 'path' cannot be empty"), nullptr); - return; - } - - //Protobuf gives an error 'Negative value is not supported' - //if the high bit is set in uint64 in GetBlockLocations - if (IsHighBitSet(offset)) { - handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr); - return; - } - if (IsHighBitSet(length)) { - handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr); - return; - } - - GetBlockLocationsRequestProto req; - req.set_src(path); - req.set_offset(offset); - req.set_length(length); - - auto resp = std::make_shared<GetBlockLocationsResponseProto>(); - - namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) { - if (stat.ok()) { - auto file_info = std::make_shared<struct FileInfo>(); - auto locations = resp->locations(); - - file_info->file_length_ = locations.filelength(); - file_info->last_block_complete_ = locations.islastblockcomplete(); - file_info->under_construction_ = locations.underconstruction(); - - for (const auto &block : locations.blocks()) { - file_info->blocks_.push_back(block); - } - - if (!locations.islastblockcomplete() && - locations.has_lastblock() && locations.lastblock().b().numbytes()) { - file_info->blocks_.push_back(locations.lastblock()); - file_info->file_length_ += locations.lastblock().b().numbytes(); - } - - handler(stat, file_info); - } else { - handler(stat, nullptr); - } - }); -} - -void NameNodeOperations::GetPreferredBlockSize(const std::string & path, - std::function<void(const Status &, const uint64_t)> handler) -{ - using ::hadoop::hdfs::GetPreferredBlockSizeRequestProto; - using ::hadoop::hdfs::GetPreferredBlockSizeResponseProto; - - LOG_TRACE(kFileSystem, << "NameNodeOperations::GetPreferredBlockSize(" - << FMT_THIS_ADDR << ", path=" << path << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("GetPreferredBlockSize: argument 'path' cannot be empty"), -1); - return; - } - - GetPreferredBlockSizeRequestProto req; - req.set_filename(path); - - auto resp = std::make_shared<GetPreferredBlockSizeResponseProto>(); - - namenode_.GetPreferredBlockSize(&req, resp, [resp, handler, path](const Status &stat) { - if (stat.ok() && resp -> has_bsize()) { - uint64_t block_size = resp -> bsize(); - handler(stat, block_size); - } else { - handler(stat, -1); - } - }); -} - -void NameNodeOperations::SetReplication(const std::string & path, int16_t replication, - std::function<void(const Status &)> handler) -{ - using ::hadoop::hdfs::SetReplicationRequestProto; - using ::hadoop::hdfs::SetReplicationResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::SetReplication(" << FMT_THIS_ADDR << ", path=" << path << - ", replication=" << replication << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty")); - return; - } - Status replStatus = FileSystemImpl::CheckValidReplication(replication); - if (!replStatus.ok()) { - handler(replStatus); - return; - } - SetReplicationRequestProto req; - req.set_src(path); - req.set_replication(replication); - - auto resp = std::make_shared<SetReplicationResponseProto>(); - - namenode_.SetReplication(&req, resp, [resp, handler, path](const Status &stat) { - if (stat.ok()) { - // Checking resp - if(resp -> has_result() && resp ->result() == 1) { - handler(stat); - } else { - //NameNode does not specify why there is no result, in my testing it was happening when the path is not found - std::string errormsg = "No such file or directory: " + path; - Status statNew = Status::PathNotFound(errormsg.c_str()); - handler(statNew); - } - } else { - handler(stat); - } - }); -} - -void NameNodeOperations::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, - std::function<void(const Status &)> handler) -{ - using ::hadoop::hdfs::SetTimesRequestProto; - using ::hadoop::hdfs::SetTimesResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::SetTimes(" << FMT_THIS_ADDR << ", path=" << path << - ", mtime=" << mtime << ", atime=" << atime << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty")); - return; - } - - SetTimesRequestProto req; - req.set_src(path); - req.set_mtime(mtime); - req.set_atime(atime); - - auto resp = std::make_shared<SetTimesResponseProto>(); - - namenode_.SetTimes(&req, resp, [resp, handler, path](const Status &stat) { - handler(stat); - }); -} - - - -void NameNodeOperations::GetFileInfo(const std::string & path, - std::function<void(const Status &, const StatInfo &)> handler) -{ - using ::hadoop::hdfs::GetFileInfoRequestProto; - using ::hadoop::hdfs::GetFileInfoResponseProto; - - LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo(" - << FMT_THIS_ADDR << ", path=" << path << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("GetFileInfo: argument 'path' cannot be empty"), StatInfo()); - return; - } - - GetFileInfoRequestProto req; - req.set_src(path); - - auto resp = std::make_shared<GetFileInfoResponseProto>(); - - namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) { - if (stat.ok()) { - // For non-existant files, the server will respond with an OK message but - // no fs in the protobuf. - if(resp -> has_fs()){ - struct StatInfo stat_info; - stat_info.path = path; - stat_info.full_path = path; - HdfsFileStatusProtoToStatInfo(stat_info, resp->fs()); - handler(stat, stat_info); - } else { - std::string errormsg = "No such file or directory: " + path; - Status statNew = Status::PathNotFound(errormsg.c_str()); - handler(statNew, StatInfo()); - } - } else { - handler(stat, StatInfo()); - } - }); -} - -void NameNodeOperations::GetContentSummary(const std::string & path, - std::function<void(const Status &, const ContentSummary &)> handler) -{ - using ::hadoop::hdfs::GetContentSummaryRequestProto; - using ::hadoop::hdfs::GetContentSummaryResponseProto; - - LOG_TRACE(kFileSystem, << "NameNodeOperations::GetContentSummary(" - << FMT_THIS_ADDR << ", path=" << path << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("GetContentSummary: argument 'path' cannot be empty"), ContentSummary()); - return; - } - - GetContentSummaryRequestProto req; - req.set_path(path); - - auto resp = std::make_shared<GetContentSummaryResponseProto>(); - - namenode_.GetContentSummary(&req, resp, [resp, handler, path](const Status &stat) { - if (stat.ok()) { - // For non-existant files, the server will respond with an OK message but - // no summary in the protobuf. - if(resp -> has_summary()){ - struct ContentSummary content_summary; - content_summary.path = path; - ContentSummaryProtoToContentSummary(content_summary, resp->summary()); - handler(stat, content_summary); - } else { - std::string errormsg = "No such file or directory: " + path; - Status statNew = Status::PathNotFound(errormsg.c_str()); - handler(statNew, ContentSummary()); - } - } else { - handler(stat, ContentSummary()); - } - }); -} - -void NameNodeOperations::GetFsStats( - std::function<void(const Status &, const FsInfo &)> handler) { - using ::hadoop::hdfs::GetFsStatusRequestProto; - using ::hadoop::hdfs::GetFsStatsResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::GetFsStats(" << FMT_THIS_ADDR << ") called"); - - GetFsStatusRequestProto req; - auto resp = std::make_shared<GetFsStatsResponseProto>(); - - namenode_.GetFsStats(&req, resp, [resp, handler](const Status &stat) { - if (stat.ok()) { - struct FsInfo fs_info; - GetFsStatsResponseProtoToFsInfo(fs_info, resp); - handler(stat, fs_info); - } else { - handler(stat, FsInfo()); - } - }); -} - -void NameNodeOperations::GetListing( - const std::string & path, - std::function<void(const Status &, const std::vector<StatInfo> &, bool)> handler, - const std::string & start_after) { - using ::hadoop::hdfs::GetListingRequestProto; - using ::hadoop::hdfs::GetListingResponseProto; - - LOG_TRACE( - kFileSystem, - << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - - if (path.empty()) { - std::vector<StatInfo> empty; - handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), empty, false); - return; - } - - GetListingRequestProto req; - req.set_src(path); - req.set_startafter(start_after.c_str()); - req.set_needlocation(false); - - auto resp = std::make_shared<GetListingResponseProto>(); - - namenode_.GetListing(&req, resp, [resp, handler, path](const Status &stat) { - std::vector<StatInfo> stat_infos; - if (stat.ok()) { - if(resp -> has_dirlist()){ - for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) { - StatInfo si; - si.path = fs.path(); - si.full_path = path + fs.path(); - if(si.full_path.back() != '/'){ - si.full_path += "/"; - } - HdfsFileStatusProtoToStatInfo(si, fs); - stat_infos.push_back(si); - } - handler(stat, stat_infos, resp->dirlist().remainingentries() > 0); - } else { - std::string errormsg = "No such file or directory: " + path; - handler(Status::PathNotFound(errormsg.c_str()), stat_infos, false); - } - } else { - handler(stat, stat_infos, false); - } - }); -} - -void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions, bool createparent, - std::function<void(const Status &)> handler) -{ - using ::hadoop::hdfs::MkdirsRequestProto; - using ::hadoop::hdfs::MkdirsResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << - ", permissions=" << permissions << ", createparent=" << createparent << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be empty")); - return; - } - - MkdirsRequestProto req; - Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions); - if (!permStatus.ok()) { - handler(permStatus); - return; - } - req.set_src(path); - hadoop::hdfs::FsPermissionProto *perm = req.mutable_masked(); - perm->set_perm(permissions); - req.set_createparent(createparent); - - auto resp = std::make_shared<MkdirsResponseProto>(); - - namenode_.Mkdirs(&req, resp, [resp, handler, path](const Status &stat) { - if (stat.ok()) { - // Checking resp - if(resp -> has_result() && resp ->result() == 1) { - handler(stat); - } else { - //NameNode does not specify why there is no result, in my testing it was happening when the path is not found - std::string errormsg = "No such file or directory: " + path; - Status statNew = Status::PathNotFound(errormsg.c_str()); - handler(statNew); - } - } else { - handler(stat); - } - }); -} - -void NameNodeOperations::Delete(const std::string & path, bool recursive, std::function<void(const Status &)> handler) { - using ::hadoop::hdfs::DeleteRequestProto; - using ::hadoop::hdfs::DeleteResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("Delete: argument 'path' cannot be empty")); - return; - } - - DeleteRequestProto req; - req.set_src(path); - req.set_recursive(recursive); - - auto resp = std::make_shared<DeleteResponseProto>(); - - namenode_.Delete(&req, resp, [resp, handler, path](const Status &stat) { - if (stat.ok()) { - // Checking resp - if(resp -> has_result() && resp ->result() == 1) { - handler(stat); - } else { - //NameNode does not specify why there is no result, in my testing it was happening when the path is not found - std::string errormsg = "No such file or directory: " + path; - Status statNew = Status::PathNotFound(errormsg.c_str()); - handler(statNew); - } - } else { - handler(stat); - } - }); -} - -void NameNodeOperations::Rename(const std::string & oldPath, const std::string & newPath, std::function<void(const Status &)> handler) { - using ::hadoop::hdfs::RenameRequestProto; - using ::hadoop::hdfs::RenameResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called"); - - if (oldPath.empty()) { - handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be empty")); - return; - } - - if (newPath.empty()) { - handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be empty")); - return; - } - - RenameRequestProto req; - req.set_src(oldPath); - req.set_dst(newPath); - - auto resp = std::make_shared<RenameResponseProto>(); - - namenode_.Rename(&req, resp, [resp, handler](const Status &stat) { - if (stat.ok()) { - // Checking resp - if(resp -> has_result() && resp ->result() == 1) { - handler(stat); - } else { - //Since NameNode does not specify why the result is not success, we set the general error - std::string errormsg = "oldPath and parent directory of newPath must exist. newPath must not exist."; - Status statNew = Status::InvalidArgument(errormsg.c_str()); - handler(statNew); - } - } else { - handler(stat); - } - }); -} - -void NameNodeOperations::SetPermission(const std::string & path, - uint16_t permissions, std::function<void(const Status &)> handler) { - using ::hadoop::hdfs::SetPermissionRequestProto; - using ::hadoop::hdfs::SetPermissionResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty")); - return; - } - Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions); - if (!permStatus.ok()) { - handler(permStatus); - return; - } - - SetPermissionRequestProto req; - req.set_src(path); - - hadoop::hdfs::FsPermissionProto *perm = req.mutable_permission(); - perm->set_perm(permissions); - - auto resp = std::make_shared<SetPermissionResponseProto>(); - - namenode_.SetPermission(&req, resp, - [handler](const Status &stat) { - handler(stat); - }); -} - -void NameNodeOperations::SetOwner(const std::string & path, - const std::string & username, const std::string & groupname, std::function<void(const Status &)> handler) { - using ::hadoop::hdfs::SetOwnerRequestProto; - using ::hadoop::hdfs::SetOwnerResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be empty")); - return; - } - - SetOwnerRequestProto req; - req.set_src(path); - - if(!username.empty()) { - req.set_username(username); - } - if(!groupname.empty()) { - req.set_groupname(groupname); - } - - auto resp = std::make_shared<SetOwnerResponseProto>(); - - namenode_.SetOwner(&req, resp, - [handler](const Status &stat) { - handler(stat); - }); -} - -void NameNodeOperations::CreateSnapshot(const std::string & path, - const std::string & name, std::function<void(const Status &)> handler) { - using ::hadoop::hdfs::CreateSnapshotRequestProto; - using ::hadoop::hdfs::CreateSnapshotResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be empty")); - return; - } - - CreateSnapshotRequestProto req; - req.set_snapshotroot(path); - if (!name.empty()) { - req.set_snapshotname(name); - } - - auto resp = std::make_shared<CreateSnapshotResponseProto>(); - - namenode_.CreateSnapshot(&req, resp, - [handler](const Status &stat) { - handler(stat); - }); -} - -void NameNodeOperations::DeleteSnapshot(const std::string & path, - const std::string & name, std::function<void(const Status &)> handler) { - using ::hadoop::hdfs::DeleteSnapshotRequestProto; - using ::hadoop::hdfs::DeleteSnapshotResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be empty")); - return; - } - if (name.empty()) { - handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be empty")); - return; - } - - DeleteSnapshotRequestProto req; - req.set_snapshotroot(path); - req.set_snapshotname(name); - - auto resp = std::make_shared<DeleteSnapshotResponseProto>(); - - namenode_.DeleteSnapshot(&req, resp, - [handler](const Status &stat) { - handler(stat); - }); -} - -void NameNodeOperations::RenameSnapshot(const std::string & path, const std::string & old_name, - const std::string & new_name, std::function<void(const Status &)> handler) { - using ::hadoop::hdfs::RenameSnapshotRequestProto; - using ::hadoop::hdfs::RenameSnapshotResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::RenameSnapshot(" << FMT_THIS_ADDR << ", path=" << path << - ", old_name=" << old_name << ", new_name=" << new_name << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("RenameSnapshot: argument 'path' cannot be empty")); - return; - } - if (old_name.empty()) { - handler(Status::InvalidArgument("RenameSnapshot: argument 'old_name' cannot be empty")); - return; - } - if (new_name.empty()) { - handler(Status::InvalidArgument("RenameSnapshot: argument 'new_name' cannot be empty")); - return; - } - - RenameSnapshotRequestProto req; - req.set_snapshotroot(path); - req.set_snapshotoldname(old_name); - req.set_snapshotnewname(new_name); - - auto resp = std::make_shared<RenameSnapshotResponseProto>(); - - namenode_.RenameSnapshot(&req, resp, - [handler](const Status &stat) { - handler(stat); - }); -} - -void NameNodeOperations::AllowSnapshot(const std::string & path, std::function<void(const Status &)> handler) { - using ::hadoop::hdfs::AllowSnapshotRequestProto; - using ::hadoop::hdfs::AllowSnapshotResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be empty")); - return; - } - - AllowSnapshotRequestProto req; - req.set_snapshotroot(path); - - auto resp = std::make_shared<AllowSnapshotResponseProto>(); - - namenode_.AllowSnapshot(&req, resp, - [handler](const Status &stat) { - handler(stat); - }); -} - -void NameNodeOperations::DisallowSnapshot(const std::string & path, std::function<void(const Status &)> handler) { - using ::hadoop::hdfs::DisallowSnapshotRequestProto; - using ::hadoop::hdfs::DisallowSnapshotResponseProto; - - LOG_TRACE(kFileSystem, - << "NameNodeOperations::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot be empty")); - return; - } - - DisallowSnapshotRequestProto req; - req.set_snapshotroot(path); - - auto resp = std::make_shared<DisallowSnapshotResponseProto>(); - - namenode_.DisallowSnapshot(&req, resp, - [handler](const Status &stat) { - handler(stat); - }); -} - -void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) { - engine_->SetFsEventCallback(callback); -} - -void NameNodeOperations::HdfsFileStatusProtoToStatInfo( - hdfs::StatInfo & stat_info, - const ::hadoop::hdfs::HdfsFileStatusProto & fs) { - stat_info.file_type = fs.filetype(); - stat_info.length = fs.length(); - stat_info.permissions = fs.permission().perm(); - stat_info.owner = fs.owner(); - stat_info.group = fs.group(); - stat_info.modification_time = fs.modification_time(); - stat_info.access_time = fs.access_time(); - stat_info.symlink = fs.symlink(); - stat_info.block_replication = fs.block_replication(); - stat_info.blocksize = fs.blocksize(); - stat_info.fileid = fs.fileid(); - stat_info.children_num = fs.childrennum(); -} - -void NameNodeOperations::ContentSummaryProtoToContentSummary( - hdfs::ContentSummary & content_summary, - const ::hadoop::hdfs::ContentSummaryProto & csp) { - content_summary.length = csp.length(); - content_summary.filecount = csp.filecount(); - content_summary.directorycount = csp.directorycount(); - content_summary.quota = csp.quota(); - content_summary.spaceconsumed = csp.spaceconsumed(); - content_summary.spacequota = csp.spacequota(); -} - -void NameNodeOperations::GetFsStatsResponseProtoToFsInfo( - hdfs::FsInfo & fs_info, - const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs) { - fs_info.capacity = fs->capacity(); - fs_info.used = fs->used(); - fs_info.remaining = fs->remaining(); - fs_info.under_replicated = fs->under_replicated(); - fs_info.corrupt_blocks = fs->corrupt_blocks(); - fs_info.missing_blocks = fs->missing_blocks(); - fs_info.missing_repl_one_blocks = fs->missing_repl_one_blocks(); - if(fs->has_blocks_in_future()){ - fs_info.blocks_in_future = fs->blocks_in_future(); - } -} - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h deleted file mode 100644 index f4caa18..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBHDFSPP_LIB_FS_NAMENODEOPERATIONS_H_ -#define LIBHDFSPP_LIB_FS_NAMENODEOPERATIONS_H_ - -#include "rpc/rpc_engine.h" -#include "hdfspp/statinfo.h" -#include "hdfspp/fsinfo.h" -#include "hdfspp/content_summary.h" -#include "common/namenode_info.h" -#include "ClientNamenodeProtocol.pb.h" -#include "ClientNamenodeProtocol.hrpc.inl" - - -namespace hdfs { - -/** -* NameNodeConnection: abstracts the details of communicating with a NameNode -* and the implementation of the communications protocol. -* -* Will eventually handle retry and failover. -* -* Threading model: thread-safe; all operations can be called concurrently -* Lifetime: owned by a FileSystemImpl -*/ - -class NameNodeOperations { -public: - MEMCHECKED_CLASS(NameNodeOperations) - NameNodeOperations(::asio::io_service *io_service, const Options &options, - const std::string &client_name, const std::string &user_name, - const char *protocol_name, int protocol_version) : - io_service_(io_service), - engine_(std::make_shared<RpcEngine>(io_service, options, client_name, user_name, protocol_name, protocol_version)), - namenode_(engine_), options_(options) {} - - - void Connect(const std::string &cluster_name, - const std::vector<ResolvedNamenodeInfo> &servers, - std::function<void(const Status &)> &&handler); - - bool CancelPendingConnect(); - - void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, - std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler); - - void GetPreferredBlockSize(const std::string & path, - std::function<void(const Status &, const uint64_t)> handler); - - void SetReplication(const std::string & path, int16_t replication, - std::function<void(const Status &)> handler); - - void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, - std::function<void(const Status &)> handler); - - void GetFileInfo(const std::string & path, - std::function<void(const Status &, const StatInfo &)> handler); - - void GetContentSummary(const std::string & path, - std::function<void(const Status &, const ContentSummary &)> handler); - - void GetFsStats(std::function<void(const Status &, const FsInfo &)> handler); - - // start_after="" for initial call - void GetListing(const std::string & path, - std::function<void(const Status &, const std::vector<StatInfo> &, bool)> handler, - const std::string & start_after = ""); - - void Mkdirs(const std::string & path, uint16_t permissions, bool createparent, - std::function<void(const Status &)> handler); - - void Delete(const std::string & path, bool recursive, - std::function<void(const Status &)> handler); - - void Rename(const std::string & oldPath, const std::string & newPath, - std::function<void(const Status &)> handler); - - void SetPermission(const std::string & path, uint16_t permissions, - std::function<void(const Status &)> handler); - - void SetOwner(const std::string & path, const std::string & username, - const std::string & groupname, std::function<void(const Status &)> handler); - - void CreateSnapshot(const std::string & path, const std::string & name, - std::function<void(const Status &)> handler); - - void DeleteSnapshot(const std::string & path, const std::string & name, - std::function<void(const Status &)> handler); - - void RenameSnapshot(const std::string & path, const std::string & old_name, const std::string & new_name, - std::function<void(const Status &)> handler); - - void AllowSnapshot(const std::string & path, - std::function<void(const Status &)> handler); - - void DisallowSnapshot(const std::string & path, - std::function<void(const Status &)> handler); - - void SetFsEventCallback(fs_event_callback callback); - -private: - static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const ::hadoop::hdfs::HdfsFileStatusProto & fs); - static void ContentSummaryProtoToContentSummary(hdfs::ContentSummary & content_summary, const ::hadoop::hdfs::ContentSummaryProto & csp); - static void DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl); - static void GetFsStatsResponseProtoToFsInfo(hdfs::FsInfo & fs_info, const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs); - - ::asio::io_service * io_service_; - - // This is the only permanent owner of the RpcEngine, however the RPC layer - // needs to reference count it prevent races during FileSystem destruction. - // In order to do this they hold weak_ptrs and promote them to shared_ptr - // when calling non-blocking RpcEngine methods e.g. get_client_id(). - std::shared_ptr<RpcEngine> engine_; - - // Automatically generated methods for RPC calls. See protoc_gen_hrpc.cc - ClientNamenodeProtocol namenode_; - const Options options_; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt deleted file mode 100644 index 2eff301..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt +++ /dev/null @@ -1,87 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -set(PROTOBUF_IMPORT_DIRS ${PROTO_HDFS_DIR} ${PROTO_HADOOP_DIR}) - -protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS - ${PROTO_HDFS_DIR}/datatransfer.proto - ${PROTO_HDFS_DIR}/ClientDatanodeProtocol.proto - ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto - ${PROTO_HDFS_DIR}/acl.proto - ${PROTO_HDFS_DIR}/datatransfer.proto - ${PROTO_HDFS_DIR}/encryption.proto - ${PROTO_HDFS_DIR}/erasurecoding.proto - ${PROTO_HDFS_DIR}/hdfs.proto - ${PROTO_HDFS_DIR}/inotify.proto - ${PROTO_HDFS_DIR}/xattr.proto - ${PROTO_HDFS_DIR}/ReconfigurationProtocol.proto - ${PROTO_HADOOP_DIR}/IpcConnectionContext.proto - ${PROTO_HADOOP_DIR}/ProtobufRpcEngine.proto - ${PROTO_HADOOP_DIR}/RpcHeader.proto - ${PROTO_HADOOP_DIR}/Security.proto -) - -add_executable(protoc-gen-hrpc protoc_gen_hrpc.cc) -target_link_libraries(protoc-gen-hrpc ${PROTOBUF_PROTOC_LIBRARY} ${PROTOBUF_LIBRARY}) - -function(GEN_HRPC SRCS) - if(NOT ARGN) - message(SEND_ERROR "Error: GEN_HRPC() called without any proto files") - return() - endif() - - if(DEFINED PROTOBUF_IMPORT_DIRS) - foreach(DIR ${PROTOBUF_IMPORT_DIRS}) - get_filename_component(ABS_PATH ${DIR} ABSOLUTE) - list(FIND _protobuf_include_path ${ABS_PATH} _contains_already) - if(${_contains_already} EQUAL -1) - list(APPEND _protobuf_include_path -I ${ABS_PATH}) - endif() - endforeach() - endif() - - set(${SRCS}) - - foreach(FIL ${ARGN}) - get_filename_component(ABS_FIL ${FIL} ABSOLUTE) - get_filename_component(FIL_WE ${FIL} NAME_WE) - - list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl") - - add_custom_command( - OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl" - COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} - ARGS --plugin=protoc-gen-hrpc=${CMAKE_CURRENT_BINARY_DIR}/protoc-gen-hrpc --hrpc_out=${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL} - DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE} protoc-gen-hrpc - COMMENT "Running HRPC protocol buffer compiler on ${FIL}" - VERBATIM ) - endforeach() - - set_source_files_properties(${${SRCS}} PROPERTIES GENERATED TRUE) - set(${SRCS} ${${SRCS}} PARENT_SCOPE) -endfunction() - -gen_hrpc(HRPC_SRCS - ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto -) - -add_library(proto_obj OBJECT ${PROTO_SRCS} ${PROTO_HDRS} ${HRPC_SRCS}) -if(HADOOP_BUILD) - add_dependencies(proto_obj copy_hadoop_files) -endif(HADOOP_BUILD) -add_library(proto $<TARGET_OBJECTS:proto_obj>) http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc deleted file mode 100644 index e7355c0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "protobuf/cpp_helpers.h" - -#include <google/protobuf/compiler/code_generator.h> -#include <google/protobuf/compiler/plugin.h> -#include <google/protobuf/descriptor.h> -#include <google/protobuf/io/printer.h> -#include <google/protobuf/io/zero_copy_stream.h> -#include <google/protobuf/stubs/common.h> - -#include <memory> - -using ::google::protobuf::FileDescriptor; -using ::google::protobuf::MethodDescriptor; -using ::google::protobuf::ServiceDescriptor; -using ::google::protobuf::compiler::CodeGenerator; -using ::google::protobuf::compiler::GeneratorContext; -using ::google::protobuf::io::Printer; -using ::google::protobuf::io::ZeroCopyOutputStream; - -class StubGenerator : public CodeGenerator { -public: - virtual bool Generate(const FileDescriptor *file, const std::string &, - GeneratorContext *ctx, - std::string *error) const override; - -private: - void EmitService(const ServiceDescriptor *service, Printer *out) const; - void EmitMethod(const MethodDescriptor *method, Printer *out) const; -}; - -bool StubGenerator::Generate(const FileDescriptor *file, const std::string &, - GeneratorContext *ctx, std::string *) const { - namespace pb = ::google::protobuf; - std::unique_ptr<ZeroCopyOutputStream> os( - ctx->Open(StripProto(file->name()) + ".hrpc.inl")); - Printer out(os.get(), '$'); - for (int i = 0; i < file->service_count(); ++i) { - const ServiceDescriptor *service = file->service(i); - EmitService(service, &out); - } - return true; -} - -void StubGenerator::EmitService(const ServiceDescriptor *service, - Printer *out) const { - out->Print("\n// GENERATED AUTOMATICALLY. DO NOT MODIFY.\n" - "class $service$ {\n" - "private:\n" - " std::shared_ptr<::hdfs::RpcEngine> engine_;\n" - "public:\n" - " typedef std::function<void(const ::hdfs::Status &)> Callback;\n" - " typedef ::google::protobuf::MessageLite Message;\n" - " inline $service$(std::shared_ptr<::hdfs::RpcEngine> engine)\n" - " : engine_(engine) {}\n", - "service", service->name()); - for (int i = 0; i < service->method_count(); ++i) { - const MethodDescriptor *method = service->method(i); - EmitMethod(method, out); - } - out->Print("};\n"); -} - -void StubGenerator::EmitMethod(const MethodDescriptor *method, - Printer *out) const { - out->Print( - "\n inline void $camel_method$(const Message *req, " - "const std::shared_ptr<Message> &resp, " - "const Callback &handler) {\n" - " engine_->AsyncRpc(\"$method$\", req, resp, handler);\n" - " }\n", - "camel_method", ToCamelCase(method->name()), "method", method->name()); -} - -int main(int argc, char *argv[]) { - StubGenerator generator; - return google::protobuf::compiler::PluginMain(argc, argv, &generator); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt deleted file mode 100644 index 2bcfd92..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt +++ /dev/null @@ -1,21 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -add_library(reader_obj OBJECT block_reader.cc datatransfer.cc readergroup.cc) -add_dependencies(reader_obj proto) -add_library(reader $<TARGET_OBJECTS:reader_obj>) http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc deleted file mode 100644 index ca7715d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc +++ /dev/null @@ -1,571 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "reader/block_reader.h" -#include "reader/datatransfer.h" -#include "common/continuation/continuation.h" -#include "common/continuation/asio.h" -#include "common/logging.h" -#include "common/util.h" - -#include <future> - -namespace hdfs { - -#define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_ -#define FMT_CONT_AND_READER_ADDR "this=" << (void*)this << ", reader=" << (void*)reader_ -#define FMT_THIS_ADDR "this=" << (void*)this - - -// Stuff an OpReadBlockProto message with required fields. -hadoop::hdfs::OpReadBlockProto ReadBlockProto(const std::string &client_name, - bool verify_checksum, const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset) -{ - using namespace hadoop::hdfs; - using namespace hadoop::common; - BaseHeaderProto *base_h = new BaseHeaderProto(); - base_h->set_allocated_block(new ExtendedBlockProto(*block)); - if (token) { - base_h->set_allocated_token(new TokenProto(*token)); - } - ClientOperationHeaderProto *h = new ClientOperationHeaderProto(); - h->set_clientname(client_name); - h->set_allocated_baseheader(base_h); - - OpReadBlockProto p; - p.set_allocated_header(h); - p.set_offset(offset); - p.set_len(length); - p.set_sendchecksums(verify_checksum); - // TODO: p.set_allocated_cachingstrategy(); - return p; -} - -// -// Notes about the BlockReader and associated object lifecycles (9/29/16) -// -We have a several stages in the read pipeline. Each stage represents a logical -// step in the HDFS block transfer logic. They are implemented as continuations -// for now, and in some cases the stage may have a nested continuation as well. -// It's important to make sure that continuations, nested or otherwise, cannot -// outlive the objects they depend on. -// -// -The BlockReader holds a shared_ptr to the DataNodeConnection that's used in each -// pipeline stage. The connection object must never be destroyed while operations are -// pending on the ASIO side (see HDFS-10931). In order to prevent a state where the -// BlockReader or one of the corresponding pipelines outlives the connection each -// pipeline stage must explicitly hold a shared pointer copied from BlockReaderImpl::dn_. -// - - -static int8_t unsecured_request_block_header[3] = {0, kDataTransferVersion, Operation::kReadBlock}; - -void BlockReaderImpl::AsyncRequestBlock(const std::string &client_name, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset, const std::function<void(Status)> &handler) -{ - LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncRequestBlock(" - << FMT_THIS_ADDR << ", ..., length=" - << length << ", offset=" << offset << ", ...) called"); - - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - bytes_to_read_ = length; - - struct State { - std::string header; - hadoop::hdfs::OpReadBlockProto request; - hadoop::hdfs::BlockOpResponseProto response; - }; - - auto m = continuation::Pipeline<State>::Create(cancel_state_); - State *s = &m->state(); - - s->request = ReadBlockProto(client_name, options_.verify_checksum, - dn_->token_.get(), block, length, offset); - - s->header = std::string((const char*)unsecured_request_block_header, 3); - - bool serialize_success = true; - s->header += SerializeDelimitedProtobufMessage(&s->request, &serialize_success); - - if(!serialize_success) { - handler(Status::Error("Unable to serialize protobuf message")); - return; - } - - auto read_pb_message = - new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>(dn_, &s->response); - - m->Push(asio_continuation::Write(dn_, asio::buffer(s->header))).Push(read_pb_message); - - m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; - if (stat.ok()) { - const auto &resp = s.response; - - if(this->event_handlers_) { - event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (stat.ok() && event_resp.response_type() == event_response::kTest_Error) { - stat = Status::Error("Test error"); - } -#endif - } - - if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) { - if (resp.has_readopchecksuminfo()) { - const auto &checksum_info = resp.readopchecksuminfo(); - chunk_padding_bytes_ = offset - checksum_info.chunkoffset(); - } - state_ = kReadPacketHeader; - } else { - stat = Status::Error(s.response.message().c_str()); - } - } - handler(stat); - }); -} - -Status BlockReaderImpl::RequestBlock(const std::string &client_name, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, uint64_t offset) -{ - LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlock(" - << FMT_THIS_ADDR <<"..., length=" - << length << ", offset=" << offset << ") called"); - - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future(stat->get_future()); - AsyncRequestBlock(client_name, block, length, offset, - [stat](const Status &status) { stat->set_value(status); }); - return future.get(); -} - -struct BlockReaderImpl::ReadPacketHeader : continuation::Continuation -{ - ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {} - - virtual void Run(const Next &next) override { - LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacketHeader::Run(" - << FMT_CONT_AND_PARENT_ADDR << ") called"); - - parent_->packet_data_read_bytes_ = 0; - parent_->packet_len_ = 0; - auto handler = [next, this](const asio::error_code &ec, size_t) { - Status status; - if (ec) { - status = Status(ec.value(), ec.message().c_str()); - } else { - parent_->packet_len_ = packet_length(); - parent_->header_.Clear(); - bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart], - header_length()); - assert(v && "Failed to parse the header"); - (void)v; //avoids unused variable warning - parent_->state_ = kReadChecksum; - } - if(parent_->event_handlers_) { - event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (status.ok() && event_resp.response_type() == event_response::kTest_Error) { - status = Status::Error("Test error"); - } -#endif - } - next(status); - }; - - asio::async_read(*parent_->dn_, asio::buffer(buf_), - std::bind(&ReadPacketHeader::CompletionHandler, this, - std::placeholders::_1, std::placeholders::_2), handler); - } - -private: - static const size_t kMaxHeaderSize = 512; - static const size_t kPayloadLenOffset = 0; - static const size_t kPayloadLenSize = sizeof(int32_t); - static const size_t kHeaderLenOffset = 4; - static const size_t kHeaderLenSize = sizeof(int16_t); - static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize; - - BlockReaderImpl *parent_; - std::array<char, kMaxHeaderSize> buf_; - - size_t packet_length() const { - return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset])); - } - - size_t header_length() const { - return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset])); - } - - size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { - if (ec) { - return 0; - } else if (transferred < kHeaderStart) { - return kHeaderStart - transferred; - } else { - return kHeaderStart + header_length() - transferred; - } - } - - // Keep the DN connection alive - std::shared_ptr<DataNodeConnection> shared_conn_; -}; - -struct BlockReaderImpl::ReadChecksum : continuation::Continuation -{ - ReadChecksum(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {} - - virtual void Run(const Next &next) override { - LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadChecksum::Run(" - << FMT_CONT_AND_PARENT_ADDR << ") called"); - - auto parent = parent_; - if (parent->state_ != kReadChecksum) { - next(Status::OK()); - return; - } - - std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_; - - auto handler = [parent, next, this, keep_conn_alive_](const asio::error_code &ec, size_t) - { - Status status; - if (ec) { - status = Status(ec.value(), ec.message().c_str()); - } else { - parent->state_ = parent->chunk_padding_bytes_ ? kReadPadding : kReadData; - } - if(parent->event_handlers_) { - event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (status.ok() && event_resp.response_type() == event_response::kTest_Error) { - status = Status::Error("Test error"); - } -#endif - } - next(status); - }; - - parent->checksum_.resize(parent->packet_len_ - sizeof(int) - parent->header_.datalen()); - - asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler); - } - -private: - BlockReaderImpl *parent_; - - // Keep the DataNodeConnection alive - std::shared_ptr<DataNodeConnection> shared_conn_; -}; - -struct BlockReaderImpl::ReadData : continuation::Continuation -{ - ReadData(BlockReaderImpl *parent, std::shared_ptr<size_t> bytes_transferred, - const asio::mutable_buffers_1 &buf) : parent_(parent), - bytes_transferred_(bytes_transferred), buf_(buf), shared_conn_(parent->dn_) - { - buf_.begin(); - } - - ~ReadData() { - buf_.end(); - } - - virtual void Run(const Next &next) override { - LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run(" - << FMT_CONT_AND_PARENT_ADDR << ") called"); - auto handler = - [next, this](const asio::error_code &ec, size_t transferred) { - Status status; - if (ec) { - status = Status(ec.value(), ec.message().c_str()); - } - - *bytes_transferred_ += transferred; - parent_->bytes_to_read_ -= transferred; - parent_->packet_data_read_bytes_ += transferred; - - if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { - parent_->state_ = kReadPacketHeader; - } - - if(parent_->event_handlers_) { - event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (status.ok() && event_resp.response_type() == event_response::kTest_Error) { - status = Status::Error("Test error"); - } -#endif - } - next(status); - }; - - auto data_len = parent_->header_.datalen() - parent_->packet_data_read_bytes_; - - asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len), handler); - } - -private: - BlockReaderImpl *parent_; - std::shared_ptr<size_t> bytes_transferred_; - const asio::mutable_buffers_1 buf_; - - // Keep DNConnection alive. - std::shared_ptr<DataNodeConnection> shared_conn_; -}; - -struct BlockReaderImpl::ReadPadding : continuation::Continuation -{ - ReadPadding(BlockReaderImpl *parent) : parent_(parent), - padding_(parent->chunk_padding_bytes_), - bytes_transferred_(std::make_shared<size_t>(0)), - read_data_(new ReadData(parent, bytes_transferred_, asio::buffer(padding_))), - shared_conn_(parent->dn_) {} - - virtual void Run(const Next &next) override { - LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPadding::Run(" - << FMT_CONT_AND_PARENT_ADDR << ") called"); - - if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) { - next(Status::OK()); - return; - } - - std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_; - - auto h = [next, this, keep_conn_alive_](const Status &stat) { - Status status = stat; - if (status.ok()) { - assert(reinterpret_cast<const int &>(*bytes_transferred_) == parent_->chunk_padding_bytes_); - parent_->chunk_padding_bytes_ = 0; - parent_->state_ = kReadData; - } - if(parent_->event_handlers_) { - event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (status.ok() && event_resp.response_type() == event_response::kTest_Error) { - status = Status::Error("Test error"); - } -#endif - } - next(status); - }; - read_data_->Run(h); - } - -private: - BlockReaderImpl *parent_; - std::vector<char> padding_; - std::shared_ptr<size_t> bytes_transferred_; - std::shared_ptr<continuation::Continuation> read_data_; - ReadPadding(const ReadPadding &) = delete; - ReadPadding &operator=(const ReadPadding &) = delete; - - // Keep DNConnection alive. - std::shared_ptr<DataNodeConnection> shared_conn_; -}; - - -struct BlockReaderImpl::AckRead : continuation::Continuation -{ - AckRead(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {} - - virtual void Run(const Next &next) override { - LOG_TRACE(kBlockReader, << "BlockReaderImpl::AckRead::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called"); - - if (parent_->bytes_to_read_ > 0) { - next(Status::OK()); - return; - } - - auto m = continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(parent_->cancel_state_); - - m->state().set_status(parent_->options_.verify_checksum - ? hadoop::hdfs::Status::CHECKSUM_OK - : hadoop::hdfs::Status::SUCCESS); - - m->Push(continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state())); - - std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_; - - m->Run([this, next, keep_conn_alive_](const Status &stat, const hadoop::hdfs::ClientReadStatusProto &) - { - Status status = stat; - if (status.ok()) { - parent_->state_ = BlockReaderImpl::kFinished; - } - if(parent_->event_handlers_) { - event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (status.ok() && event_resp.response_type() == event_response::kTest_Error) { - status = Status::Error("Test error"); - } -#endif - } - next(status); - }); - } - -private: - BlockReaderImpl *parent_; - - // Keep DNConnection alive. - std::shared_ptr<DataNodeConnection> shared_conn_; -}; - -void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers, - const std::function<void(const Status &, size_t bytes_transferred)> &handler) -{ - assert(state_ != kOpen && "Not connected"); - - LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadPacket called"); - - struct State { - std::shared_ptr<size_t> bytes_transferred; - }; - - auto m = continuation::Pipeline<State>::Create(cancel_state_); - m->state().bytes_transferred = std::make_shared<size_t>(0); - - // Note: some of these continuations have nested pipelines. - m->Push(new ReadPacketHeader(this)) - .Push(new ReadChecksum(this)) - .Push(new ReadPadding(this)) - .Push(new ReadData( - this, m->state().bytes_transferred, buffers)) - .Push(new AckRead(this)); - - auto self = this->shared_from_this(); - m->Run([self, handler](const Status &status, const State &state) { - handler(status, *state.bytes_transferred); - }); -} - - -size_t BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status *status) -{ - LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called"); - - size_t transferred = 0; - auto done = std::make_shared<std::promise<void>>(); - auto future = done->get_future(); - AsyncReadPacket(buffers, - [status, &transferred, done](const Status &stat, size_t t) { - *status = stat; - transferred = t; - done->set_value(); - }); - future.wait(); - return transferred; -} - - -struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation -{ - RequestBlockContinuation(BlockReader *reader, const std::string &client_name, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset) - : reader_(reader), client_name_(client_name), length_(length), offset_(offset) - { - block_.CheckTypeAndMergeFrom(*block); - } - - virtual void Run(const Next &next) override { - LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlockContinuation::Run(" - << FMT_CONT_AND_READER_ADDR << ") called"); - - reader_->AsyncRequestBlock(client_name_, &block_, length_, offset_, next); - } - -private: - BlockReader *reader_; - const std::string client_name_; - hadoop::hdfs::ExtendedBlockProto block_; - uint64_t length_; - uint64_t offset_; -}; - -struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation -{ - ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, size_t *transferred) - : reader_(reader), buffer_(buffer), buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {} - - virtual void Run(const Next &next) override { - LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadBlockContinuation::Run(" - << FMT_CONT_AND_READER_ADDR << ") called"); - *transferred_ = 0; - next_ = next; - OnReadData(Status::OK(), 0); - } - -private: - BlockReader *reader_; - const MutableBuffers buffer_; - const size_t buffer_size_; - size_t *transferred_; - std::function<void(const Status &)> next_; - - void OnReadData(const Status &status, size_t transferred) { - using std::placeholders::_1; - using std::placeholders::_2; - *transferred_ += transferred; - if (!status.ok()) { - next_(status); - } else if (*transferred_ >= buffer_size_) { - next_(status); - } else { - reader_->AsyncReadPacket( - asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), - std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); - } - } -}; - -void BlockReaderImpl::AsyncReadBlock( - const std::string & client_name, - const hadoop::hdfs::LocatedBlockProto &block, - size_t offset, - const MutableBuffers &buffers, - const std::function<void(const Status &, size_t)> handler) -{ - LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock(" - << FMT_THIS_ADDR << ") called"); - - auto m = continuation::Pipeline<size_t>::Create(cancel_state_); - size_t * bytesTransferred = &m->state(); - - size_t size = asio::buffer_size(buffers); - - m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, offset)) - .Push(new ReadBlockContinuation(this, buffers, bytesTransferred)); - - m->Run([handler] (const Status &status, const size_t totalBytesTransferred) { - handler(status, totalBytesTransferred); - }); -} - -void BlockReaderImpl::CancelOperation() { - LOG_TRACE(kBlockReader, << "BlockReaderImpl::CancelOperation(" - << FMT_THIS_ADDR << ") called"); - /* just forward cancel to DNConnection */ - dn_->Cancel(); -} - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h deleted file mode 100644 index b5cbdf5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef BLOCK_READER_H_ -#define BLOCK_READER_H_ - -#include "hdfspp/status.h" -#include "common/async_stream.h" -#include "common/cancel_tracker.h" -#include "common/new_delete.h" -#include "datatransfer.pb.h" -#include "connection/datanodeconnection.h" - -#include <memory> - -namespace hdfs { - -struct CacheStrategy { - bool drop_behind_specified; - bool drop_behind; - bool read_ahead_specified; - unsigned long long read_ahead; - CacheStrategy() - : drop_behind_specified(false), drop_behind(false), - read_ahead_specified(false), read_ahead(false) {} -}; - -enum DropBehindStrategy { - kUnspecified = 0, - kEnableDropBehind = 1, - kDisableDropBehind = 2, -}; - -enum EncryptionScheme { - kNone = 0, - kAESCTRNoPadding = 1, -}; - -struct BlockReaderOptions { - bool verify_checksum; - CacheStrategy cache_strategy; - EncryptionScheme encryption_scheme; - - BlockReaderOptions() - : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {} -}; - -/** - * Handles the operational state of request and reading a block (or portion of - * a block) from a DataNode. - * - * Threading model: not thread-safe. - * Lifecycle: should be created, used for a single read, then freed. - */ -class BlockReader { -public: - MEMCHECKED_CLASS(BlockReader) - virtual void AsyncReadBlock( - const std::string & client_name, - const hadoop::hdfs::LocatedBlockProto &block, size_t offset, - const MutableBuffers &buffers, - const std::function<void(const Status &, size_t)> handler) = 0; - - virtual void AsyncReadPacket( - const MutableBuffers &buffers, - const std::function<void(const Status &, size_t bytes_transferred)> &handler) = 0; - - virtual void AsyncRequestBlock( - const std::string &client_name, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, - uint64_t offset, - const std::function<void(Status)> &handler) = 0; - - virtual void CancelOperation() = 0; -}; - -class BlockReaderImpl - : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> { -public: - explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn, - CancelHandle cancel_state, std::shared_ptr<LibhdfsEvents> event_handlers=nullptr) - : dn_(dn), state_(kOpen), options_(options), - chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {} - - virtual void AsyncReadPacket( - const MutableBuffers &buffers, - const std::function<void(const Status &, size_t bytes_transferred)> &handler) override; - - virtual void AsyncRequestBlock( - const std::string &client_name, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, - uint64_t offset, - const std::function<void(Status)> &handler) override; - - virtual void AsyncReadBlock( - const std::string & client_name, - const hadoop::hdfs::LocatedBlockProto &block, size_t offset, - const MutableBuffers &buffers, - const std::function<void(const Status &, size_t)> handler) override; - - virtual void CancelOperation() override; - - size_t ReadPacket(const MutableBuffers &buffers, Status *status); - - Status RequestBlock( - const std::string &client_name, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, - uint64_t offset); - -private: - struct RequestBlockContinuation; - struct ReadBlockContinuation; - - struct ReadPacketHeader; - struct ReadChecksum; - struct ReadPadding; - struct ReadData; - struct AckRead; - enum State { - kOpen, - kReadPacketHeader, - kReadChecksum, - kReadPadding, - kReadData, - kFinished, - }; - - std::shared_ptr<DataNodeConnection> dn_; - hadoop::hdfs::PacketHeaderProto header_; - State state_; - BlockReaderOptions options_; - size_t packet_len_; - int packet_data_read_bytes_; - int chunk_padding_bytes_; - long long bytes_to_read_; - std::vector<char> checksum_; - CancelHandle cancel_state_; - LibhdfsEvents* event_handlers_; -}; -} - -#endif --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org