HDFS-7014. Implement input streams and file system functionality (zhwangzw via cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e559ce04 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e559ce04 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e559ce04 Branch: refs/heads/HDFS-6994 Commit: e559ce0483c01d90967035e6a312e7ec1f2fdff9 Parents: 4d28f73 Author: Colin Patrick Mccabe <[email protected]> Authored: Fri Oct 24 16:20:55 2014 -0700 Committer: Colin Patrick Mccabe <[email protected]> Committed: Fri Oct 24 16:20:55 2014 -0700 ---------------------------------------------------------------------- .../src/contrib/libhdfs3/src/CMakeLists.txt | 13 +- .../libhdfs3/src/GenerateProtobufs.cmake | 2 + .../contrib/libhdfs3/src/client/BlockLocation.h | 87 ++ .../contrib/libhdfs3/src/client/BlockReader.h | 10 +- .../libhdfs3/src/client/DirectoryIterator.cc | 87 ++ .../libhdfs3/src/client/DirectoryIterator.h | 55 ++ .../contrib/libhdfs3/src/client/FileSystem.cc | 518 +++++++++++ .../contrib/libhdfs3/src/client/FileSystem.h | 277 ++++++ .../libhdfs3/src/client/FileSystemImpl.cc | 760 +++++++++++++++ .../libhdfs3/src/client/FileSystemImpl.h | 478 ++++++++++ .../libhdfs3/src/client/FileSystemKey.cc | 89 ++ .../contrib/libhdfs3/src/client/FileSystemKey.h | 97 ++ .../libhdfs3/src/client/FileSystemStats.h | 77 ++ .../contrib/libhdfs3/src/client/InputStream.cc | 123 +++ .../contrib/libhdfs3/src/client/InputStream.h | 47 +- .../libhdfs3/src/client/InputStreamImpl.cc | 919 +++++++++++++++++++ .../libhdfs3/src/client/InputStreamImpl.h | 80 +- .../libhdfs3/src/client/LocalBlockReader.h | 28 +- .../src/contrib/libhdfs3/src/client/Packet.h | 28 +- .../contrib/libhdfs3/src/client/PacketHeader.h | 13 +- .../libhdfs3/src/client/RemoteBlockReader.h | 18 +- .../contrib/libhdfs3/src/client/TokenInternal.h | 27 + .../src/contrib/libhdfs3/src/common/Config.cc | 213 +++++ .../src/contrib/libhdfs3/src/common/Config.h | 218 +++++ .../contrib/libhdfs3/src/common/ConfigImpl.cc | 291 ++++++ .../contrib/libhdfs3/src/common/ConfigImpl.h | 165 ++++ .../libhdfs3/src/common/SessionConfig.cc | 163 ++-- .../contrib/libhdfs3/src/common/SessionConfig.h | 20 +- .../src/contrib/libhdfs3/src/common/Status.cc | 33 + .../src/contrib/libhdfs3/src/common/Status.h | 60 ++ .../libhdfs3/src/common/StatusInternal.h | 137 +++ .../contrib/libhdfs3/src/common/UnorderedMap.h | 6 +- .../contrib/libhdfs3/src/common/XmlConfig.cc | 395 -------- .../src/contrib/libhdfs3/src/common/XmlConfig.h | 182 ---- .../libhdfs3/src/common/XmlConfigParser.cc | 154 ++++ .../libhdfs3/src/common/XmlConfigParser.h | 69 ++ .../libhdfs3/src/server/LocatedBlocks.cc | 19 +- .../contrib/libhdfs3/src/server/LocatedBlocks.h | 37 +- .../src/contrib/libhdfs3/src/server/Namenode.h | 210 +++-- .../contrib/libhdfs3/src/server/NamenodeImpl.h | 146 +-- .../contrib/libhdfs3/src/server/NamenodeInfo.cc | 50 +- .../contrib/libhdfs3/src/server/NamenodeInfo.h | 9 +- .../contrib/libhdfs3/src/server/NamenodeProxy.h | 68 +- 43 files changed, 5438 insertions(+), 1040 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/CMakeLists.txt index 5ca3716..4ee279a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/CMakeLists.txt @@ -48,7 +48,18 @@ SET(libhdfs3_PROTO_FILES ${libhdfs3_PROTO_FILES} PARENT_SCOPE) INCLUDE(GenerateProtobufs.cmake) INCLUDE_DIRECTORIES("${CMAKE_BINARY_DIR}") -SET(HEADER client/hdfs.h) +SET(HEADER + client/BlockLocation.h + client/DirectoryIterator.h + client/FileStatus.h + client/FileSystem.h + client/FileSystemStats.h + client/InputStream.h + client/Permission.h + common/Config.h + common/SharedPtr.h + common/Status.h +) ADD_LIBRARY(libhdfs3-static STATIC ${LIBHDFS3_SOURCES} ${LIBHDFS3_PROTO_SOURCES} ${LIBHDFS3_PROTO_HEADERS}) ADD_LIBRARY(libhdfs3-shared SHARED ${LIBHDFS3_SOURCES} ${LIBHDFS3_PROTO_SOURCES} ${LIBHDFS3_PROTO_HEADERS}) http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/GenerateProtobufs.cmake ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/GenerateProtobufs.cmake b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/GenerateProtobufs.cmake index 50efc2c..8cad7b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/GenerateProtobufs.cmake +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/GenerateProtobufs.cmake @@ -56,6 +56,8 @@ COPY_IF_CHANGED("${CMAKE_BINARY_DIR}/hdfs_pb" ${R}/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto ${R}/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto ${R}/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/xattr.proto + ${R}/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/encryption.proto + ${R}/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto ) AUTO_SOURCES(PB_SOURCES "*.proto" "RECURSE" "${CMAKE_BINARY_DIR}") http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/BlockLocation.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/BlockLocation.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/BlockLocation.h new file mode 100644 index 0000000..6e7f3e3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/BlockLocation.h @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _HDFS_LIBHDFS3_CLIENT_BLOCKLOCATION_H_ +#define _HDFS_LIBHDFS3_CLIENT_BLOCKLOCATION_H_ + +#include <string> +#include <vector> + +namespace hdfs { + +class BlockLocation { +public: + bool isCorrupt() const { + return corrupt; + } + + void setCorrupt(bool corrupt) { + this->corrupt = corrupt; + } + + const std::vector<std::string> &getHosts() const { + return hosts; + } + + void setHosts(const std::vector<std::string> &hosts) { + this->hosts = hosts; + } + + int64_t getLength() const { + return length; + } + + void setLength(int64_t length) { + this->length = length; + } + + const std::vector<std::string> &getNames() const { + return names; + } + + void setNames(const std::vector<std::string> &names) { + this->names = names; + } + + int64_t getOffset() const { + return offset; + } + + void setOffset(int64_t offset) { + this->offset = offset; + } + + const std::vector<std::string> &getTopologyPaths() const { + return topologyPaths; + } + + void setTopologyPaths(const std::vector<std::string> &topologyPaths) { + this->topologyPaths = topologyPaths; + } + +private: + bool corrupt; + int64_t length; + int64_t offset; // Offset of the block in the file + std::vector<std::string> hosts; // Datanode hostnames + std::vector<std::string> names; // Datanode IP:xferPort for getting block + std::vector<std::string> topologyPaths; // Full path name in network topo +}; +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_BLOCKLOCATION_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/BlockReader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/BlockReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/BlockReader.h index fe583cf..dda61be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/BlockReader.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/BlockReader.h @@ -26,6 +26,9 @@ namespace internal { class BlockReader { public: + BlockReader() { + } + virtual ~BlockReader() { } @@ -42,15 +45,18 @@ public: * @return return the number of bytes filled in the buffer, * it may less than size. Return 0 if reach the end of block. */ - virtual int32_t read(char * buf, int32_t size) = 0; + virtual int32_t read(char *buf, int32_t size) = 0; /** * Move the cursor forward len bytes. * @param len The number of bytes to skip. */ virtual void skip(int64_t len) = 0; -}; +private: + BlockReader(const BlockReader &other); + BlockReader &operator=(const BlockReader &other); +}; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/DirectoryIterator.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/DirectoryIterator.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/DirectoryIterator.cc new file mode 100644 index 0000000..a1a59c1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/DirectoryIterator.cc @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DirectoryIterator.h" +#include "FileStatus.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileSystemImpl.h" +#include "StatusInternal.h" + +using namespace hdfs::internal; + +namespace hdfs { + +DirectoryIterator::DirectoryIterator() + : needLocations(false), hasNextItem(false), filesystem(NULL), next(0) { +} + +DirectoryIterator::DirectoryIterator(hdfs::internal::FileSystemImpl *const fs, + const std::string &path, + bool needLocations) + : needLocations(needLocations), + hasNextItem(false), + filesystem(fs), + next(0), + path(path) { +} + +bool DirectoryIterator::getListing() { + bool more; + + if (NULL == filesystem) { + return false; + } + + next = 0; + lists.clear(); + more = filesystem->getListing(path, startAfter, needLocations, lists); + + if (!lists.empty()) { + startAfter = lists.back().getPath(); + } + + return hasNextItem = (more || !lists.empty()); +} + +bool DirectoryIterator::hasNext() { + return hasNextItem; +} + +Status DirectoryIterator::getNext(FileStatus *output) { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + if (next >= lists.size()) { + if (!getListing()) { + return Status(EOVERFLOW, "End of the dir flow"); + } + } + + *output = lists[next++]; + + if (next >= lists.size()) { + getListing(); + } + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/DirectoryIterator.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/DirectoryIterator.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/DirectoryIterator.h new file mode 100644 index 0000000..47708db --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/DirectoryIterator.h @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _HDFS_LIBHFDS3_CLIENT_DIRECTORY_ITERATOR_H_ +#define _HDFS_LIBHFDS3_CLIENT_DIRECTORY_ITERATOR_H_ + +#include "FileStatus.h" +#include "Status.h" + +#include <vector> + +namespace hdfs { +namespace internal { +class FileSystemImpl; +} + +class DirectoryIterator { +public: + DirectoryIterator(); + bool hasNext(); + Status getNext(FileStatus *output); + +private: + DirectoryIterator(hdfs::internal::FileSystemImpl *const fs, + const std::string &path, bool needLocations); + bool getListing(); + + bool needLocations; + bool hasNextItem; + hdfs::internal::FileSystemImpl *filesystem; + size_t next; + std::string path; + std::string startAfter; + std::vector<FileStatus> lists; + + friend hdfs::internal::FileSystemImpl; +}; +} + +#endif /* _HDFS_LIBHFDS3_CLIENT_DIRECTORY_ITERATOR_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystem.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystem.cc new file mode 100644 index 0000000..bf88f9f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystem.cc @@ -0,0 +1,518 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DirectoryIterator.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileSystem.h" +#include "FileSystemImpl.h" +#include "FileSystemKey.h" +#include "Hash.h" +#include "SessionConfig.h" +#include "StatusInternal.h" +#include "Thread.h" +#include "UnorderedMap.h" +#include "WritableUtils.h" + +#include <algorithm> +#include <string> +#include <krb5/krb5.h> + +using namespace hdfs::internal; + +namespace hdfs { + +namespace internal { + +static std::string ExtractPrincipalFromTicketCache( + const std::string &cachePath) { + krb5_context cxt = NULL; + krb5_ccache ccache = NULL; + krb5_principal principal = NULL; + krb5_error_code ec = 0; + std::string errmsg, retval; + char *priName = NULL; + + if (!cachePath.empty()) { + if (0 != setenv("KRB5CCNAME", cachePath.c_str(), 1)) { + THROW(HdfsIOException, "Cannot set env parameter \"KRB5CCNAME\""); + } + } + + do { + if (0 != (ec = krb5_init_context(&cxt))) { + break; + } + + if (0 != (ec = krb5_cc_default(cxt, &ccache))) { + break; + } + + if (0 != (ec = krb5_cc_get_principal(cxt, ccache, &principal))) { + break; + } + + if (0 != (ec = krb5_unparse_name(cxt, principal, &priName))) { + break; + } + } while (0); + + if (!ec) { + retval = priName; + } else { + if (cxt) { + errmsg = krb5_get_error_message(cxt, ec); + } else { + errmsg = "Cannot initialize kerberos context"; + } + } + + if (priName != NULL) { + krb5_free_unparsed_name(cxt, priName); + } + + if (principal != NULL) { + krb5_free_principal(cxt, principal); + } + + if (ccache != NULL) { + krb5_cc_close(cxt, ccache); + } + + if (cxt != NULL) { + krb5_free_context(cxt); + } + + if (!errmsg.empty()) { + THROW(HdfsIOException, + "FileSystem: Filed to extract principal from ticket cache: %s", + errmsg.c_str()); + } + + return retval; +} + +static std::string ExtractPrincipalFromToken(const Token &token) { + std::string realUser, owner; + std::string identifier = token.getIdentifier(); + WritableUtils cin(&identifier[0], identifier.size()); + char version; + + try { + version = cin.readByte(); + + if (version != 0) { + THROW(HdfsIOException, "Unknown version of delegation token"); + } + + owner = cin.ReadText(); + cin.ReadText(); + realUser = cin.ReadText(); + return realUser.empty() ? owner : realUser; + } catch (const std::range_error &e) { + } + + THROW(HdfsIOException, "Cannot extract principal from token"); +} +} + +FileSystem::FileSystem(const Config &conf) : conf(conf) { +} + +FileSystem::~FileSystem() { + impl.reset(); +} + +Status FileSystem::connect() { + try { + internal::SessionConfig sconf(*conf.impl); + return connect(sconf.getDefaultUri().c_str(), NULL, NULL); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::connect(const std::string &uri) { + try { + connect(uri, NULL, NULL); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +static shared_ptr<FileSystemImpl> ConnectInternal(const std::string &uri, + const std::string &principal, + const Token *token, + Config &conf) { + if (uri.empty()) { + THROW(InvalidParameter, "Invalid HDFS uri."); + } + + FileSystemKey key(uri, principal.c_str()); + + if (token) { + key.addToken(*token); + } + + return shared_ptr<FileSystemImpl>(new FileSystemImpl(key, conf)); +} + +Status FileSystem::connect(const std::string &uri, const std::string &username, + const std::string &token) { + AuthMethod auth; + std::string principal; + + THROW(HdfsIOException, "FileSystem: already connected."); + + try { + SessionConfig sconf(*conf.impl); + auth = RpcAuth::ParseMethod(sconf.getRpcAuthMethod()); + + if (!token.empty() && auth != AuthMethod::SIMPLE) { + Token t; + t.fromString(token); + principal = ExtractPrincipalFromToken(t); + impl = ConnectInternal(uri, principal, &t, conf); + impl->connect(); + return Status::OK(); + } else if (!username.empty()) { + principal = username; + } + + if (auth == AuthMethod::KERBEROS) { + principal = + ExtractPrincipalFromTicketCache(sconf.getKerberosCachePath()); + } + + impl = ConnectInternal(uri, principal, NULL, conf); + impl->connect(); + } catch (...) { + impl.reset(); + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +void FileSystem::disconnect() { + impl.reset(); +} + +Status FileSystem::getDefaultReplication(int *output) const { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getDefaultReplication(); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::getDefaultBlockSize(int64_t *output) const { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getDefaultBlockSize(); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::getHomeDirectory(std::string *output) const { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getHomeDirectory(); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::deletePath(const std::string &path, bool recursive) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + if (false == impl->deletePath(path.c_str(), recursive)) { + return Status(EIO); + } + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::mkdir(const std::string &path, + const Permission &permission) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + if (false == impl->mkdir(path.c_str(), permission)) { + return Status(EIO); + } + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::mkdirs(const std::string &path, + const Permission &permission) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + if (false == impl->mkdirs(path.c_str(), permission)) { + return Status(EIO); + } + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::getFileStatus(const std::string &path, + FileStatus *output) const { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getFileStatus(path.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::getFileBlockLocations(const std::string &path, int64_t start, + int64_t len, + std::vector<BlockLocation> *output) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getFileBlockLocations(path.c_str(), start, len); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::listDirectory(const std::string &path, + DirectoryIterator *output) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->listDirectory(path.c_str(), false); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::setOwner(const std::string &path, + const std::string &username, + const std::string &groupname) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + impl->setOwner(path.c_str(), username.c_str(), groupname.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::setTimes(const std::string &path, int64_t mtime, + int64_t atime) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + impl->setTimes(path.c_str(), mtime, atime); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::setPermission(const std::string &path, + const Permission &permission) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + impl->setPermission(path.c_str(), permission); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::setReplication(const std::string &path, short replication) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + if (false == impl->setReplication(path.c_str(), replication)) { + return Status(EIO); + } + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::rename(const std::string &src, const std::string &dst) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + if (false == impl->rename(src.c_str(), dst.c_str())) { + return Status(EIO); + } + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::setWorkingDirectory(const std::string &path) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + impl->setWorkingDirectory(path.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::getWorkingDirectory(std::string *output) const { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getWorkingDirectory(); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::exist(const std::string &path) const { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + if (false == impl->exist(path.c_str())) { + return Status(ENOENT); + } + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::getStats(FileSystemStats *output) const { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getFsStats(); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::getDelegationToken(const std::string &renewer, + std::string *output) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getDelegationToken(renewer.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::getDelegationToken(std::string *output) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getDelegationToken(); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::renewDelegationToken(const std::string &token, + int64_t *output) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->renewDelegationToken(token.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status FileSystem::cancelDelegationToken(const std::string &token) { + CHECK_PARAMETER(impl, EIO, "FileSystem: not connected."); + + try { + impl->cancelDelegationToken(token.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystem.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystem.h new file mode 100644 index 0000000..c40e753 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystem.h @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _HDFS_LIBHDFS3_CLIENT_FILESYSTEM_H_ +#define _HDFS_LIBHDFS3_CLIENT_FILESYSTEM_H_ + +#include "BlockLocation.h" +#include "Config.h" +#include "DirectoryIterator.h" +#include "FileStatus.h" +#include "FileSystemStats.h" +#include "Permission.h" +#include "SharedPtr.h" +#include "Status.h" +#include "Token.h" + +#include <vector> + +namespace hdfs { +namespace internal { +struct FileSystemImpl; +} + +class FileSystem { +public: + /** + * Construct a FileSystem + * @param conf hdfs configuration + */ + FileSystem(const Config &conf); + + /** + * Destroy a HdfsFileSystem instance + */ + ~FileSystem(); + + /** + * Connect to default hdfs. + * @return the result status of this operation + */ + Status connect(); + + /** + * Connect to hdfs + * @param uri hdfs connection uri, hdfs://host:port + * @return the result status of this operation + */ + Status connect(const std::string &uri); + + /** + * Connect to hdfs with user or token + * username and token cannot be set at the same time + * @param uri connection uri. + * @param username user used to connect to hdfs + * @param token token used to connect to hdfs + * @return the result status of this operation + */ + Status connect(const std::string &uri, const std::string &username, + const std::string &token); + + /** + * disconnect from hdfs + */ + void disconnect(); + + /** + * To get default number of replication. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getDefaultReplication(int *output) const; + + /** + * To get the default number of block size. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getDefaultBlockSize(int64_t *output) const; + + /** + * To get the home directory. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getHomeDirectory(std::string *output) const; + + /** + * To delete a file or directory. + * @param path the path to be deleted. + * @param recursive if path is a directory, delete the contents recursively. + * @return the result status of this operation + */ + Status deletePath(const std::string &path, bool recursive); + + /** + * To create a directory which given permission. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return the result status of this operation + */ + Status mkdir(const std::string &path, const Permission &permission); + + /** + * To create a directory which given permission. + * If parent path does not exits, create it. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return the result status of this operation + */ + Status mkdirs(const std::string &path, const Permission &permission); + + /** + * To get path information. + * @param path the path which information is to be returned. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getFileStatus(const std::string &path, FileStatus *output) const; + + /** + * Return an array containing hostnames, offset and size of + * portions of the given file. + * + * This call is most helpful with DFS, where it returns + * hostnames of machines that contain the given file. + * + * The FileSystem will simply return an elt containing 'localhost'. + * + * @param path path is used to identify an FS since an FS could have + * another FS that it could be delegating the call to + * @param start offset into the given file + * @param len length for which to get locations for + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getFileBlockLocations(const std::string &path, int64_t start, + int64_t len, + std::vector<BlockLocation> *output); + + /** + * list the contents of a directory. + * @param path The directory path. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status listDirectory(const std::string &path, DirectoryIterator *output); + + /** + * To set the owner and the group of the path. + * username and groupname cannot be empty at the same time. + * @param path the path which owner of group is to be changed. + * @param username new user name. + * @param groupname new group. + * @return the result status of this operation + */ + Status setOwner(const std::string &path, const std::string &username, + const std::string &groupname); + + /** + * To set the access time or modification time of a path. + * @param path the path which access time or modification time is to be + * changed. + * @param mtime new modification time. + * @param atime new access time. + * @return the result status of this operation + */ + Status setTimes(const std::string &path, int64_t mtime, int64_t atime); + + /** + * To set the permission of a path. + * @param path the path which permission is to be changed. + * @param permission new permission. + * @return the result status of this operation + */ + Status setPermission(const std::string &path, const Permission &permission); + + /** + * To set the number of replication. + * @param path the path which number of replication is to be changed. + * @param replication new number of replication. + * @return return true if success. + * @return the result status of this operation + */ + Status setReplication(const std::string &path, short replication); + + /** + * To rename a path. + * @param src old path. + * @param dst new path. + * @return the result status of this operation + */ + Status rename(const std::string &src, const std::string &dst); + + /** + * To set working directory. + * @param path new working directory. + * @return the result status of this operation + */ + Status setWorkingDirectory(const std::string &path); + + /** + * To get working directory. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getWorkingDirectory(std::string *output) const; + + /** + * To test if the path exist. + * @param path the path which is to be tested. + * @return the result status of this operation + */ + Status exist(const std::string &path) const; + + /** + * To get the file system status. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getStats(FileSystemStats *output) const; + + /** + * Get a valid Delegation Token. + * @param renewer the designated renewer for the token + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getDelegationToken(const std::string &renewer, std::string *output); + + /** + * Get a valid Delegation Token using the default user as renewer. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getDelegationToken(std::string *output); + + /** + * Renew an existing delegation token. + * @param token delegation token obtained earlier + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status renewDelegationToken(const std::string &token, int64_t *output); + + /** + * Cancel an existing delegation token. + * @param token delegation token + * @return the result status of this operation + */ + Status cancelDelegationToken(const std::string &token); + +private: + FileSystem(const FileSystem &other); + FileSystem &operator=(const FileSystem &other); + Config conf; + hdfs::internal::shared_ptr<internal::FileSystemImpl> impl; + + friend class InputStream; + friend class OutputStream; +}; +} +#endif /* _HDFS_LIBHDFS3_CLIENT_FILESYSTEM_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemImpl.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemImpl.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemImpl.cc new file mode 100644 index 0000000..9bf2943 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemImpl.cc @@ -0,0 +1,760 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Atomic.h" +#include "BlockLocation.h" +#include "DirectoryIterator.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileStatus.h" +#include "FileSystemImpl.h" +#include "FileSystemStats.h" +#include "InputStream.h" +#include "Logger.h" +#include "StringUtil.h" +#include "server/LocatedBlocks.h" +#include "server/NamenodeInfo.h" +#include "server/NamenodeProxy.h" + +#include <cstring> +#include <deque> +#include <inttypes.h> +#include <libxml/uri.h> +#include <strings.h> + +using std::string; +using std::deque; +using std::vector; + +namespace hdfs { +namespace internal { + +static const std::string GetAbsPath(const std::string &prefix, + const std::string &path) { + if (path.empty()) { + return prefix; + } + + if ('/' == path[0]) { + return path; + } else { + return prefix + "/" + path; + } +} + +/* + * Return the canonical absolute name of file NAME. + * A canonical name does not contain any `.', `..' components nor any repeated + * path separators ('/') + */ +static const std::string CanonicalizePath(const std::string &path) { + int skip = 0; + string retval; + vector<string> components = StringSplit(path, "/"); + deque<string> tmp; + vector<string>::reverse_iterator s = components.rbegin(); + + while (s != components.rend()) { + if (s->empty() || *s == ".") { + ++s; + } else if (*s == "..") { + ++skip; + ++s; + } else { + if (skip <= 0) { + tmp.push_front(*s); + } else { + --skip; + } + + ++s; + } + } + + for (size_t i = 0; i < tmp.size(); ++i) { + retval += "/"; + retval += tmp[i]; + } + + return retval.empty() ? "/" : retval; +} + +FileSystemImpl::FileSystemImpl(const FileSystemKey &key, const Config &c) + : conf(c), + key(key), + openedOutputStream(0), + nn(NULL), + sconf(*c.impl), + user(key.getUser()) { + static atomic<uint32_t> count(0); + std::stringstream ss; + srand((unsigned int)time(NULL)); + ss << "libhdfs3_client_random_" << rand() << "_count_" << ++count << "_pid_" + << getpid() << "_tid_" << pthread_self(); + clientName = ss.str(); + workingDir = std::string("/user/") + user.getEffectiveUser(); +#ifdef MOCK + stub = NULL; +#endif + // set log level + RootLogger.setLogSeverity(sconf.getLogSeverity()); +} + +/** + * Destroy a FileSystemBase instance + */ +FileSystemImpl::~FileSystemImpl() { + try { + disconnect(); + } catch (...) { + } +} + +const std::string FileSystemImpl::getStandardPath(const char *path) { + std::string base; + { + lock_guard<mutex> lock(mutWorkingDir); + base = workingDir; + } + return CanonicalizePath(GetAbsPath(base, path)); +} + +const char *FileSystemImpl::getClientName() { + return clientName.c_str(); +} + +void FileSystemImpl::connect() { + std::string host, port, uri; + std::vector<NamenodeInfo> namenodeInfos; + + if (nn) { + THROW(HdfsIOException, "FileSystemImpl: already connected."); + } + + host = key.getHost(); + port = key.getPort(); + uri += key.getScheme() + "://" + host; + + if (port.empty()) { + try { + Status status = NamenodeInfo::GetHANamenodeInfo(key.getHost(), conf, + &namenodeInfos); + } catch (const HdfsConfigNotFound &e) { + NESTED_THROW(InvalidParameter, + "Cannot parse URI: %s, missing port or invalid HA " + "configuration", + uri.c_str()); + } + + tokenService = "ha-hdfs:"; + tokenService += host; + } else { + std::stringstream ss; + ss << host << ":" << port; + namenodeInfos.resize(1); + namenodeInfos[0].setRpcAddr(ss.str()); + tokenService = namenodeInfos[0].getRpcAddr(); + } + +#ifdef MOCK + nn = stub->getNamenode(); +#else + nn = new NamenodeProxy( + namenodeInfos, tokenService, sconf, + RpcAuth(user, RpcAuth::ParseMethod(sconf.getRpcAuthMethod()))); +#endif + /* + * To test if the connection is ok + */ + getFsStats(); +} + +/** + * disconnect from hdfs + */ +void FileSystemImpl::disconnect() { + if (nn) { + nn->close(); + delete nn; + } + + nn = NULL; +} + +/** + * To get default number of replication. + * @return the default number of replication. + */ +int FileSystemImpl::getDefaultReplication() const { + return sconf.getDefaultReplica(); +} + +/** + * To get the default number of block size. + * @return the default block size. + */ +int64_t FileSystemImpl::getDefaultBlockSize() const { + return sconf.getDefaultBlockSize(); +} + +/** + * To get the home directory. + * @return home directory. + */ +std::string FileSystemImpl::getHomeDirectory() const { + return std::string("/user/") + user.getEffectiveUser(); +} + +/** + * To delete a file or directory. + * @param path the path to be deleted. + * @param recursive if path is a directory, delete the contents recursively. + * @return return true if success. + */ + +bool FileSystemImpl::deletePath(const char *path, bool recursive) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return nn->deleteFile(getStandardPath(path), recursive); +} + +/** + * To create a directory which given permission. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return return true if success. + */ + +bool FileSystemImpl::mkdir(const char *path, const Permission &permission) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return nn->mkdirs(getStandardPath(path), permission, false); +} + +/** + * To create a directory which given permission. + * If parent path does not exits, create it. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return return true if success. + */ + +bool FileSystemImpl::mkdirs(const char *path, const Permission &permission) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return nn->mkdirs(getStandardPath(path), permission, true); +} + +/** + * To get path information. + * @param path the path which information is to be returned. + * @return the path information. + */ +FileStatus FileSystemImpl::getFileStatus(const char *path) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return nn->getFileInfo(getStandardPath(path)); +} + +static void Convert(BlockLocation &bl, const LocatedBlock &lb) { + const std::vector<DatanodeInfo> &nodes = lb.getLocations(); + bl.setCorrupt(lb.isCorrupt()); + bl.setLength(lb.getNumBytes()); + bl.setOffset(lb.getOffset()); + std::vector<std::string> hosts(nodes.size()); + std::vector<std::string> names(nodes.size()); + std::vector<std::string> topologyPaths(nodes.size()); + + for (size_t i = 0; i < nodes.size(); ++i) { + hosts[i] = nodes[i].getHostName(); + names[i] = nodes[i].getXferAddr(); + topologyPaths[i] = + nodes[i].getLocation() + '/' + nodes[i].getXferAddr(); + } + + bl.setNames(names); + bl.setHosts(hosts); + bl.setTopologyPaths(topologyPaths); +} + +std::vector<BlockLocation> FileSystemImpl::getFileBlockLocations( + const char *path, int64_t start, int64_t len) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + if (start < 0) { + THROW(InvalidParameter, + "Invalid input: start offset should be positive"); + } + + if (len < 0) { + THROW(InvalidParameter, "Invalid input: length should be positive"); + } + + LocatedBlocks lbs; + nn->getBlockLocations(getStandardPath(path), start, len, lbs); + std::vector<LocatedBlock> blocks = lbs.getBlocks(); + std::vector<BlockLocation> retval(blocks.size()); + + for (size_t i = 0; i < blocks.size(); ++i) { + Convert(retval[i], blocks[i]); + } + + return retval; +} + +/** + * list the contents of a directory. + * @param path the directory path. + * @return return the path informations in the given directory. + */ +DirectoryIterator FileSystemImpl::listDirectory(const char *path, + bool needLocation) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return DirectoryIterator(this, getStandardPath(path), needLocation); +} + +/** + * To set the owner and the group of the path. + * username and groupname cannot be empty at the same time. + * @param path the path which owner of group is to be changed. + * @param username new user name. + * @param groupname new group. + */ +void FileSystemImpl::setOwner(const char *path, const char *username, + const char *groupname) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + if ((NULL == username || !strlen(username)) && + (NULL == groupname || !strlen(groupname))) { + THROW(InvalidParameter, + "Invalid input: username and groupname should not be empty"); + } + + nn->setOwner(getStandardPath(path), username != NULL ? username : "", + groupname != NULL ? groupname : ""); +} + +/** + * To set the access time or modification time of a path. + * @param path the path which access time or modification time is to be changed. + * @param mtime new modification time. + * @param atime new access time. + */ +void FileSystemImpl::setTimes(const char *path, int64_t mtime, int64_t atime) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + nn->setTimes(getStandardPath(path), mtime, atime); +} + +/** + * To set the permission of a path. + * @param path the path which permission is to be changed. + * @param permission new permission. + */ +void FileSystemImpl::setPermission(const char *path, + const Permission &permission) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + nn->setPermission(getStandardPath(path), permission); +} + +/** + * To set the number of replication. + * @param path the path which number of replication is to be changed. + * @param replication new number of replication. + * @return return true if success. + */ + +bool FileSystemImpl::setReplication(const char *path, short replication) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return nn->setReplication(getStandardPath(path), replication); +} + +/** + * To rename a path. + * @param src old path. + * @param dst new path. + * @return return true if success. + */ + +bool FileSystemImpl::rename(const char *src, const char *dst) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == src || !strlen(src)) { + THROW(InvalidParameter, "Invalid input: src should not be empty"); + } + + if (NULL == dst || !strlen(dst)) { + THROW(InvalidParameter, "Invalid input: dst should not be empty"); + } + + return nn->rename(getStandardPath(src), getStandardPath(dst)); +} + +/** + * To set working directory. + * @param path new working directory. + */ +void FileSystemImpl::setWorkingDirectory(const char *path) { + if (NULL == path) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + if (!strlen(path) || '/' != path[0]) { + THROW(InvalidParameter, + "Invalid input: path should be an absolute path"); + } + + lock_guard<mutex> lock(mutWorkingDir); + workingDir = path; +} + +/** + * To get working directory. + * @return working directory. + */ +std::string FileSystemImpl::getWorkingDirectory() const { + return workingDir; +} + +/** + * To test if the path exist. + * @param path the path which is to be tested. + * @return return true if the path exist. + */ + +bool FileSystemImpl::exist(const char *path) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + try { + getFileStatus(path); + } catch (const FileNotFoundException &e) { + return false; + } + + return true; +} + +/** + * To get the file system status. + * @return the file system status. + */ +FileSystemStats FileSystemImpl::getFsStats() { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + std::vector<int64_t> retval = nn->getFsStats(); + assert(retval.size() >= 3); + return FileSystemStats(retval[0], retval[1], retval[2]); +} + +static std::string ConstructTempFilePath(const std::string &path, + const std::string clientName) { + std::stringstream ss; + srand((unsigned int)time(NULL)); + static atomic<uint32_t> count(0); + std::vector<std::string> components = StringSplit(path, "/"); + ss << '/'; + + for (size_t i = components.size(); i > 0; --i) { + if (!components[i - 1].empty()) { + components[i - 1].clear(); + break; + } + } + + for (size_t i = 0; i < components.size(); ++i) { + if (!components[i].empty()) { + ss << components[i] << '/'; + } + } + + ss << "._client_" << clientName << "_random_" << rand() << "_count_" + << ++count << "_tid_" << pthread_self() << "_TRUNCATE_TMP"; + return ss.str(); +} + +std::string FileSystemImpl::getDelegationToken(const char *renewer) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == renewer || !strlen(renewer)) { + THROW(InvalidParameter, "Invalid input: renewer should not be empty."); + } + + Token retval = nn->getDelegationToken(renewer); + retval.setService(tokenService); + return retval.toString(); +} + +std::string FileSystemImpl::getDelegationToken() { + return getDelegationToken(key.getUser().getPrincipal().c_str()); +} + +int64_t FileSystemImpl::renewDelegationToken(const std::string &token) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + Token t; + t.fromString(token); + return nn->renewDelegationToken(t); +} + +void FileSystemImpl::cancelDelegationToken(const std::string &token) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + Token t; + t.fromString(token); + nn->cancelDelegationToken(t); +} + +void FileSystemImpl::getBlockLocations(const std::string &src, int64_t offset, + int64_t length, LocatedBlocks &lbs) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->getBlockLocations(src, offset, length, lbs); +} + +void FileSystemImpl::create(const std::string &src, const Permission &masked, + int flag, bool createParent, short replication, + int64_t blockSize) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->create(src, masked, clientName, flag, createParent, replication, + blockSize); +} + +shared_ptr<LocatedBlock> FileSystemImpl::append(const std::string &src) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->append(src, clientName); +} + +void FileSystemImpl::abandonBlock(const ExtendedBlock &b, + const std::string &src) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->abandonBlock(b, src, clientName); +} + +shared_ptr<LocatedBlock> FileSystemImpl::addBlock( + const std::string &src, const ExtendedBlock *previous, + const std::vector<DatanodeInfo> &excludeNodes) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->addBlock(src, clientName, previous, excludeNodes); +} + +shared_ptr<LocatedBlock> FileSystemImpl::getAdditionalDatanode( + const std::string &src, const ExtendedBlock &blk, + const std::vector<DatanodeInfo> &existings, + const std::vector<std::string> &storageIDs, + const std::vector<DatanodeInfo> &excludes, int numAdditionalNodes) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->getAdditionalDatanode(src, blk, existings, storageIDs, excludes, + numAdditionalNodes, clientName); +} + +bool FileSystemImpl::complete(const std::string &src, + const ExtendedBlock *last) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->complete(src, clientName, last); +} + +/*void FileSystemImpl::reportBadBlocks(const std::vector<LocatedBlock> &blocks) +{ + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->reportBadBlocks(blocks); +}*/ + +void FileSystemImpl::fsync(const std::string &src) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->fsync(src, clientName); +} + +shared_ptr<LocatedBlock> FileSystemImpl::updateBlockForPipeline( + const ExtendedBlock &block) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->updateBlockForPipeline(block, clientName); +} + +void FileSystemImpl::updatePipeline( + const ExtendedBlock &oldBlock, const ExtendedBlock &newBlock, + const std::vector<DatanodeInfo> &newNodes, + const std::vector<std::string> &storageIDs) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs); +} + +bool FileSystemImpl::getListing(const std::string &src, + const std::string &startAfter, + bool needLocation, + std::vector<FileStatus> &dl) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->getListing(src, startAfter, needLocation, dl); +} + +bool FileSystemImpl::renewLease() { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + // protected by LeaseRenewer's lock + if (0 == openedOutputStream) { + return false; + } + + try { + nn->renewLease(clientName); + return true; + } catch (const HdfsException &e) { + LOG(LOG_ERROR, + "Failed to renew lease for filesystem which client name " + "is %s, since:\n%s", + getClientName(), GetExceptionDetail(e)); + } catch (const std::exception &e) { + LOG(LOG_ERROR, + "Failed to renew lease for filesystem which client name is " + "%s, since:\n%s", + getClientName(), e.what()); + } + + return false; +} + +void FileSystemImpl::registerOpenedOutputStream() { + // protected by LeaseRenewer's lock + ++openedOutputStream; +} + +bool FileSystemImpl::unregisterOpenedOutputStream() { + // protected by LeaseRenewer's lock + if (openedOutputStream > 0) { + --openedOutputStream; + } + + return openedOutputStream == 0; +} +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemImpl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemImpl.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemImpl.h new file mode 100644 index 0000000..149de37 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemImpl.h @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _HDFS_LIBHDFS3_CLIENT_FILESYSTEMIMPL_H_ +#define _HDFS_LIBHDFS3_CLIENT_FILESYSTEMIMPL_H_ + +#include "BlockLocation.h" +#include "Config.h" +#include "DirectoryIterator.h" +#include "FileStatus.h" +#include "FileSystemKey.h" +#include "FileSystemStats.h" +#include "server/LocatedBlocks.h" +#include "server/Namenode.h" +#include "SessionConfig.h" +#include "UnorderedMap.h" +#include "UserInfo.h" +#ifdef MOCK +#include "NamenodeStub.h" +#endif + +#include <string> +#include <vector> + +namespace hdfs { +class Permission; +} + +namespace hdfs { +namespace internal { + +class FileSystemImpl { +public: + /** + * Construct a FileSystemImpl instance. + * @param key a key which can be uniquely identify a FileSystemImpl + * instance. + * @param c a configuration objecto used to initialize the instance. + */ + FileSystemImpl(const FileSystemKey &key, const Config &c); + + /** + * Destroy a FileSystemBase instance + */ + ~FileSystemImpl(); + + /** + * Format the path to a absolute canonicalized path. + * @param path target path to be hendled. + * @return return a absolute canonicalized path. + */ + const std::string getStandardPath(const char *path); + + /** + * To get the client unique ID. + * @return return the client unique ID. + */ + const char *getClientName(); + + /** + * Connect to hdfs + */ + void connect(); + + /** + * disconnect from hdfs + */ + void disconnect(); + + /** + * To get default number of replication. + * @return the default number of replication. + */ + int getDefaultReplication() const; + + /** + * To get the default block size. + * @return the default block size. + */ + int64_t getDefaultBlockSize() const; + + /** + * To get the home directory. + * @return home directory. + */ + std::string getHomeDirectory() const; + + /** + * To delete a file or directory. + * @param path the path to be deleted. + * @param recursive if path is a directory, delete the contents recursively. + * @return return true if success. + */ + bool deletePath(const char *path, bool recursive); + + /** + * To create a directory with given permission. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return return true if success. + */ + bool mkdir(const char *path, const Permission &permission); + + /** + * To create a directory which given permission. + * If parent path does not exits, create it. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return return true if success. + */ + bool mkdirs(const char *path, const Permission &permission); + + /** + * To get path information. + * @param path the path which information is to be returned. + * @return the path information. + */ + FileStatus getFileStatus(const char *path); + + /** + * Return an array containing hostnames, offset and size of + * portions of the given file. + * + * This call is most helpful with DFS, where it returns + * hostnames of machines that contain the given file. + * + * The FileSystem will simply return an elt containing 'localhost'. + * + * @param path path is used to identify an FS since an FS could have + * another FS that it could be delegating the call to + * @param start offset into the given file + * @param len length for which to get locations for + */ + std::vector<BlockLocation> getFileBlockLocations(const char *path, + int64_t start, + int64_t len); + + /** + * list the contents of a directory. + * @param path the directory path. + * @return Return a iterator to visit all elements in this directory. + */ + DirectoryIterator listDirectory(const char *path, bool needLocation); + + /** + * To set the owner and the group of the path. + * username and groupname cannot be empty at the same time. + * @param path the path which owner of group is to be changed. + * @param username new user name. + * @param groupname new group. + */ + void setOwner(const char *path, const char *username, + const char *groupname); + + /** + * To set the access time or modification time of a path. + * @param path the path which access time or modification time is to be + * changed. + * @param mtime new modification time. + * @param atime new access time. + */ + void setTimes(const char *path, int64_t mtime, int64_t atime); + + /** + * To set the permission of a path. + * @param path the path which permission is to be changed. + * @param permission new permission. + */ + void setPermission(const char *path, const Permission &permission); + + /** + * To set the number of replication. + * @param path the path which number of replication is to be changed. + * @param replication new number of replication. + * @return return true if success. + */ + bool setReplication(const char *path, short replication); + + /** + * To rename a path. + * @param src old path. + * @param dst new path. + * @return return true if success. + */ + bool rename(const char *src, const char *dst); + + /** + * To set working directory. + * @param path new working directory. + */ + void setWorkingDirectory(const char *path); + + /** + * To get working directory. + * @return working directory. + */ + std::string getWorkingDirectory() const; + + /** + * To test if the path exist. + * @param path the path which is to be tested. + * @return return true if the path exist. + */ + bool exist(const char *path); + + /** + * To get the file system status. + * @return the file system status. + */ + FileSystemStats getFsStats(); + + /** + * Get a valid Delegation Token. + * + * @param renewer the designated renewer for the token + * @return Token + * @throws IOException + */ + std::string getDelegationToken(const char *renewer); + + /** + * Get a valid Delegation Token using default user as renewer. + * + * @return Token + * @throws IOException + */ + std::string getDelegationToken(); + + /** + * Renew an existing delegation token. + * + * @param token delegation token obtained earlier + * @return the new expiration time + * @throws IOException + */ + int64_t renewDelegationToken(const std::string &token); + + /** + * Cancel an existing delegation token. + * + * @param token delegation token + * @throws IOException + */ + void cancelDelegationToken(const std::string &token); + + /** + * Get locations of the blocks of the specified file within the specified + *range. + * DataNode locations for each block are sorted by + * the proximity to the client. + * + * The client will then have to contact + * one of the indicated DataNodes to obtain the actual data. + * + * @param src file name + * @param offset range start offset + * @param length range length + * @param lbs output the returned blocks + */ + void getBlockLocations(const std::string &src, int64_t offset, + int64_t length, LocatedBlocks &lbs); + + /** + * Create a new file entry in the namespace. + * + * @param src path of the file being created. + * @param masked masked permission. + * @param flag indicates whether the file should be + * overwritten if it already exists or create if it does not exist or + *append. + * @param createParent create missing parent directory if true + * @param replication block replication factor. + * @param blockSize maximum block size. + */ + void create(const std::string &src, const Permission &masked, int flag, + bool createParent, short replication, int64_t blockSize); + + /** + * Append to the end of the file. + * + * @param src path of the file being created. + * @return return the last partial block if any + */ + shared_ptr<LocatedBlock> append(const std::string &src); + + /** + * The client can give up on a block by calling abandonBlock(). + * The client can then either obtain a new block, or complete or abandon the + *file. + * Any partial writes to the block will be discarded. + * + * @param b the block to be abandoned. + * @param src the file which the block belongs to. + */ + void abandonBlock(const ExtendedBlock &b, const std::string &src); + + /** + * A client that wants to write an additional block to the + * indicated filename (which must currently be open for writing) + * should call addBlock(). + * + * addBlock() allocates a new block and datanodes the block data + * should be replicated to. + * + * addBlock() also commits the previous block by reporting + * to the name-node the actual generation stamp and the length + * of the block that the client has transmitted to data-nodes. + * + * @param src the file being created + * @param previous previous block + * @param excludeNodes a list of nodes that should not be allocated for the + *current block. + * @return return the new block. + */ + shared_ptr<LocatedBlock> addBlock( + const std::string &src, const ExtendedBlock *previous, + const std::vector<DatanodeInfo> &excludeNodes); + + /** + * Get a datanode for an existing pipeline. + * + * @param src the file being written + * @param blk the block being written + * @param existings the existing nodes in the pipeline + * @param excludes the excluded nodes + * @param numAdditionalNodes number of additional datanodes + * @return return a new block information which contains new datanode. + */ + shared_ptr<LocatedBlock> getAdditionalDatanode( + const std::string &src, const ExtendedBlock &blk, + const std::vector<DatanodeInfo> &existings, + const std::vector<std::string> &storageIDs, + const std::vector<DatanodeInfo> &excludes, int numAdditionalNodes); + + /** + * The client is done writing data to the given filename, and would + * like to complete it. + * + * The function returns whether the file has been closed successfully. + * If the function returns false, the caller should try again. + * + * close() also commits the last block of file by reporting + * to the name-node the actual generation stamp and the length + * of the block that the client has transmitted to data-nodes. + * + * A call to complete() will not return true until all the file's + * blocks have been replicated the minimum number of times. Thus, + * DataNode failures may cause a client to call complete() several + * times before succeeding. + * + * @param src the file being written. + * @param last last block to be committed. + * @return return false if the client should retry. + */ + bool complete(const std::string &src, const ExtendedBlock *last); + + /** + * The client wants to report corrupted blocks (blocks with specified + * locations on datanodes). + * @param blocks Array of located blocks to report + */ + /*void reportBadBlocks(const std::vector<LocatedBlock> &blocks);*/ + + /** + * Write all metadata for this file into persistent storage. + * The file must be currently open for writing. + * @param src The const std::string &representation of the path + */ + void fsync(const std::string &src); + + /** + * Get a new generation stamp together with an access token for + * a block under construction + * + * This method is called only when a client needs to recover a failed + * pipeline or set up a pipeline for appending to a block. + * + * @param block a block + * @return return a located block with a new generation stamp and an access + *token + */ + shared_ptr<LocatedBlock> updateBlockForPipeline(const ExtendedBlock &block); + + /** + * Update a pipeline for a block under construction + * + * @param oldBlock the old block + * @param newBlock the new block containing new generation stamp and length + * @param newNodes datanodes in the pipeline + */ + void updatePipeline(const ExtendedBlock &oldBlock, + const ExtendedBlock &newBlock, + const std::vector<DatanodeInfo> &newNodes, + const std::vector<std::string> &storageIDs); + + /** + * register the output stream in filespace when it is opened. + */ + void registerOpenedOutputStream(); + + /** + * unregister the output stream from filespace when it is closed. + */ + bool unregisterOpenedOutputStream(); + + /** + * Get the configuration used in filesystem. + * @return return the configuration instance. + */ + const SessionConfig &getConf() const { + return sconf; + } + + /** + * Get the user used in filesystem. + * @return return the user information. + */ + const UserInfo &getUserInfo() const { + return user; + } + + /** + * Get a partial listing of the indicated directory + * + * @param src the directory name + * @param startAfter the name to start listing after encoded in java UTF8 + * @param needLocation if the FileStatus should contain block locations + * @param dl append the returned directories. + * @return return true if there are more items. + */ + bool getListing(const std::string &src, const std::string &startAfter, + bool needLocation, std::vector<FileStatus> &dl); + + /** + * To renew the lease. + * + * @return return false if the filesystem no long needs to renew lease. + */ + bool renewLease(); + +private: + FileSystemImpl(const FileSystemImpl &other); + FileSystemImpl &operator=(const FileSystemImpl &other); + + Config conf; + FileSystemKey key; + int openedOutputStream; + mutex mutWorkingDir; + Namenode *nn; + SessionConfig sconf; + std::string clientName; + std::string tokenService; + std::string workingDir; + UserInfo user; +#ifdef MOCK +private: + Hdfs::Mock::NamenodeStub *stub; +#endif +}; +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_FILESYSTEMIMPL_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemKey.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemKey.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemKey.cc new file mode 100644 index 0000000..835c37f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemKey.cc @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileSystemKey.h" + +#include <algorithm> +#include <libxml/uri.h> +#include <sstream> + +namespace hdfs { +namespace internal { + +FileSystemKey::FileSystemKey(const std::string &uri, const char *u) { + xmlURIPtr uriobj; + std::stringstream ss; + uriobj = xmlParseURI(uri.c_str()); + + try { + if (!uriobj || uriobj->server == NULL || 0 == strlen(uriobj->server)) { + THROW(InvalidParameter, + "Invalid input: uri: %s is not a valid URI type.", + uri.c_str()); + } + + host = uriobj->server; + + if (NULL == uriobj->scheme || 0 == strlen(uriobj->scheme)) { + scheme = "hdfs"; + } else { + scheme = uriobj->scheme; + } + + if (strcasecmp(scheme.c_str(), "hdfs")) { + THROW(InvalidParameter, + "Invalid input: uri is not a valid URI type."); + } + + if (u && strlen(u) > 0) { + user = UserInfo(u); + } else if (NULL == uriobj->user || 0 == strlen(uriobj->user)) { + user = UserInfo::LocalUser(); + } else { + user = UserInfo(uriobj->user); + } + + ss << user.getEffectiveUser(); + + if (uriobj->port == 0) { + ss << "@" << uriobj->server; + } else { + std::stringstream s; + s << uriobj->port; + port = s.str(); + ss << "@" << uriobj->server << ":" << uriobj->port; + } + + authority = ss.str(); + } catch (...) { + if (uriobj) { + xmlFreeURI(uriobj); + } + + throw; + } + + xmlFreeURI(uriobj); + std::transform(authority.begin(), authority.end(), authority.begin(), + tolower); + std::transform(scheme.begin(), scheme.end(), scheme.begin(), tolower); +} +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemKey.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemKey.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemKey.h new file mode 100644 index 0000000..0a417d7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemKey.h @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _HDFS_LIBHDFS3_CLIENT_FILESYSTEMKEY_H_ +#define _HDFS_LIBHDFS3_CLIENT_FILESYSTEMKEY_H_ + +#include "Hash.h" +#include "UserInfo.h" + +#include <string> + +namespace hdfs { +namespace internal { + +class FileSystemKey { +public: + FileSystemKey(const std::string &uri, const char *user); + + FileSystemKey(const std::string &auth, const std::string &host, + const std::string &port, const std::string &scheme, + const std::string &user, size_t u) + : authority(auth), host(host), port(port), scheme(scheme), user(user) { + } + + bool operator==(const FileSystemKey &other) const { + return scheme == other.scheme && authority == other.authority; + } + + size_t hash_value() const { + size_t values[] = {StringHasher(scheme), StringHasher(authority)}; + return CombineHasher(values, sizeof(values) / sizeof(values[0])); + } + + const std::string &getHost() const { + return host; + } + + void setHost(const std::string &host) { + this->host = host; + } + + const std::string &getPort() const { + return port; + } + + void setPort(const std::string &port) { + this->port = port; + } + + const std::string &getScheme() const { + return scheme; + } + + void setScheme(const std::string &scheme) { + this->scheme = scheme; + } + + const UserInfo &getUser() const { + return user; + } + + void setUser(const UserInfo &user) { + this->user = user; + } + + void addToken(const Token &token) { + user.addToken(token); + } + +private: + std::string authority; + std::string host; + std::string port; + std::string scheme; + UserInfo user; +}; +} +} + +HDFS_HASH_DEFINE(hdfs::internal::FileSystemKey); + +#endif /* _HDFS_LIBHDFS3_CLIENT_FILESYSTEMKEY_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemStats.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemStats.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemStats.h new file mode 100644 index 0000000..31e2806 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemStats.h @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _HDFS_LIBHDFS3_CLIENT_FSSTATS_H_ +#define _HDFS_LIBHDFS3_CLIENT_FSSTATS_H_ + +#include <stdint.h> + +namespace hdfs { + +/** + * file system statistics + */ +class FileSystemStats { +public: + /** + * To construct a FileSystemStats. + */ + FileSystemStats() : capacity(-1), used(-1), remaining(-1) { + } + + /** + * To construct a FileSystemStats with given values. + * @param capacity the capacity of file system. + * @param used the space which has been used. + * @param remaining available space on file system. + */ + FileSystemStats(int64_t capacity, int64_t used, int64_t remaining) + : capacity(capacity), used(used), remaining(remaining) { + } + + /** + * Return the capacity in bytes of the file system + * @return capacity of file system. + */ + int64_t getCapacity() { + return capacity; + } + + /** + * Return the number of bytes used on the file system + * @return return used space. + */ + int64_t getUsed() { + return used; + } + + /** + * Return the number of remaining bytes on the file system + * @return return available space. + */ + int64_t getRemaining() { + return remaining; + } + +private: + int64_t capacity; + int64_t used; + int64_t remaining; +}; +} +#endif /* _HDFS_LIBHDFS3_CLIENT_FSSTATS_H_ */
