http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc new file mode 100644 index 0000000..3168fe4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FileSystemImpl.h" +#include "InputStream.h" +#include "InputStreamImpl.h" +#include "StatusInternal.h" + +using namespace hdfs::internal; + +namespace hdfs { + +InputStream::InputStream() { + impl = new internal::InputStreamImpl; +} + +InputStream::~InputStream() { + delete impl; +} + +Status InputStream::open(FileSystem &fs, const std::string &path, + bool verifyChecksum) { + if (!fs.impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + try { + impl->open(fs.impl, path.c_str(), verifyChecksum); + } catch (...) { + return lastError = CreateStatusFromException(current_exception()); + } + + return lastError = Status::OK(); +} + +int32_t InputStream::read(char *buf, int32_t size) { + int32_t retval = -1; + + try { + retval = impl->read(buf, size); + lastError = Status::OK(); + } catch (...) { + lastError = CreateStatusFromException(current_exception()); + } + + return retval; +} + +Status InputStream::readFully(char *buf, int64_t size) { + try { + impl->readFully(buf, size); + } catch (...) { + return lastError = CreateStatusFromException(current_exception()); + } + + return lastError = Status::OK(); +} + +int64_t InputStream::available() { + int64_t retval = -1; + + try { + retval = impl->available(); + lastError = Status::OK(); + } catch (...) { + lastError = CreateStatusFromException(current_exception()); + } + + return retval; +} + +Status InputStream::seek(int64_t pos) { + try { + impl->seek(pos); + } catch (...) { + return lastError = CreateStatusFromException(current_exception()); + } + + return lastError = Status::OK(); +} + +int64_t InputStream::tell() { + int64_t retval = -1; + + try { + retval = impl->tell(); + lastError = Status::OK(); + } catch (...) { + lastError = CreateStatusFromException(current_exception()); + } + + return retval; +} + +Status InputStream::close() { + try { + impl->close(); + } catch (...) { + return lastError = CreateStatusFromException(current_exception()); + } + + return lastError = Status::OK(); +} + +Status InputStream::getLastError() { + return lastError; +} +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h index ddd9434..bd7a5dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h @@ -19,20 +19,28 @@ #ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_ #define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_ -#include "FileSystem.h" +#include "Status.h" namespace hdfs { namespace internal { -class InputStreamInter; +class InputStreamImpl; } +class FileSystem; + /** * A input stream used read data from hdfs. */ class InputStream { public: + /** + * Construct an instance. + */ InputStream(); + /** + * Destroy this instance. + */ ~InputStream(); /** @@ -40,51 +48,66 @@ public: * @param fs hdfs file system. * @param path the file to be read. * @param verifyChecksum verify the checksum. + * @return the result status of this operation */ - void open(FileSystem & fs, const char * path, bool verifyChecksum = true); + Status open(FileSystem &fs, const std::string &path, + bool verifyChecksum = true); /** * To read data from hdfs. * @param buf the buffer used to filled. * @param size buffer size. - * @return return the number of bytes filled in the buffer, it may less than size. + * @return return the number of bytes filled in the buffer, it may less than + * size, -1 on error. */ - int32_t read(char * buf, int32_t size); + int32_t read(char *buf, int32_t size); /** * To read data from hdfs, block until get the given size of bytes. * @param buf the buffer used to filled. * @param size the number of bytes to be read. + * @return the result status of this operation */ - void readFully(char * buf, int64_t size); + Status readFully(char *buf, int64_t size); /** * Get how many bytes can be read without blocking. - * @return The number of bytes can be read without blocking. + * @return The number of bytes can be read without blocking, -1 on error. */ int64_t available(); /** * To move the file point to the given position. * @param pos the given position. + * @return the result status of this operation */ - void seek(int64_t pos); + Status seek(int64_t pos); /** * To get the current file point position. - * @return the position of current file point. + * @return the position of current file pointer, -1 on error. */ int64_t tell(); /** * Close the stream. + * @return the result status of this operation */ - void close(); + Status close(); + + /** + * Get the error status of the last operation. + * @return the error status of the last operation. + */ + Status getLastError(); private: - Internal::InputStreamInter * impl; -}; + InputStream(const InputStream &other); + InputStream &operator=(InputStream &other); + internal::InputStreamImpl *impl; + Status lastError; +}; } #endif /* _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc new file mode 100644 index 0000000..ca4c645 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc @@ -0,0 +1,919 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileSystemImpl.h" +#include "InputStreamImpl.h" +#include "LocalBlockReader.h" +#include "Logger.h" +#include "RemoteBlockReader.h" +#include "Thread.h" +#include "UnorderedMap.h" +#include "server/Datanode.h" + +#include <algorithm> +#include <ifaddrs.h> +#include <inttypes.h> +#include <iostream> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +namespace hdfs { +namespace internal { + +mutex InputStreamImpl::MutLocalBlockInforCache; +unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType>> + InputStreamImpl::LocalBlockInforCache; + +unordered_set<std::string> BuildLocalAddrSet() { + unordered_set<std::string> set; + struct ifaddrs *ifAddr = NULL; + struct ifaddrs *pifAddr = NULL; + struct sockaddr *addr; + + if (getifaddrs(&ifAddr)) { + THROW(HdfsNetworkException, + "InputStreamImpl: cannot get local network interface: %s", + GetSystemErrorInfo(errno)); + } + + try { + std::vector<char> host; + const char *pHost; + host.resize(INET6_ADDRSTRLEN + 1); + + for (pifAddr = ifAddr; pifAddr != NULL; pifAddr = pifAddr->ifa_next) { + addr = pifAddr->ifa_addr; + memset(&host[0], 0, INET6_ADDRSTRLEN + 1); + + if (addr->sa_family == AF_INET) { + pHost = inet_ntop( + addr->sa_family, + &(reinterpret_cast<struct sockaddr_in *>(addr))->sin_addr, + &host[0], INET6_ADDRSTRLEN); + } else if (addr->sa_family == AF_INET6) { + pHost = inet_ntop( + addr->sa_family, + &(reinterpret_cast<struct sockaddr_in6 *>(addr))->sin6_addr, + &host[0], INET6_ADDRSTRLEN); + } else { + continue; + } + + if (NULL == pHost) { + THROW(HdfsNetworkException, + "InputStreamImpl: cannot get convert network address " + "to textual form: %s", + GetSystemErrorInfo(errno)); + } + + set.insert(pHost); + } + + /* + * add hostname. + */ + long hostlen = sysconf(_SC_HOST_NAME_MAX); + host.resize(hostlen + 1); + + if (gethostname(&host[0], host.size())) { + THROW(HdfsNetworkException, + "InputStreamImpl: cannot get hostname: %s", + GetSystemErrorInfo(errno)); + } + + set.insert(&host[0]); + } catch (...) { + if (ifAddr != NULL) { + freeifaddrs(ifAddr); + } + + throw; + } + + if (ifAddr != NULL) { + freeifaddrs(ifAddr); + } + + return set; +} + +InputStreamImpl::InputStreamImpl() + : closed(true), + localRead(true), + readFromUnderConstructedBlock(false), + verify(true), + maxGetBlockInfoRetry(3), + cursor(0), + endOfCurBlock(0), + lastBlockBeingWrittenLength(0), + prefetchSize(0) { +#ifdef MOCK + stub = NULL; +#endif +} + +InputStreamImpl::~InputStreamImpl() { +} + +void InputStreamImpl::checkStatus() { + if (closed) { + THROW(HdfsIOException, "InputStreamImpl: stream is not opened."); + } + + if (lastError != exception_ptr()) { + rethrow_exception(lastError); + } +} + +int64_t InputStreamImpl::readBlockLength(const LocatedBlock &b) { + const std::vector<DatanodeInfo> &nodes = b.getLocations(); + int replicaNotFoundCount = nodes.size(); + + for (size_t i = 0; i < nodes.size(); ++i) { + try { + int64_t n = 0; + shared_ptr<Datanode> dn; + RpcAuth a = auth; + a.getUser().addToken(b.getToken()); +#ifdef MOCK + + if (stub) { + dn = stub->getDatanode(); + } else { + dn = shared_ptr<Datanode>( + new DatanodeImpl(nodes[i].getIpAddr().c_str(), + nodes[i].getIpcPort(), *conf, a)); + } + +#else + dn = shared_ptr<Datanode>(new DatanodeImpl( + nodes[i].getIpAddr().c_str(), nodes[i].getIpcPort(), *conf, a)); +#endif + n = dn->getReplicaVisibleLength(b); + + if (n >= 0) { + return n; + } + } catch (const ReplicaNotFoundException &e) { + LOG(LOG_ERROR, + "InputStreamImpl: failed to get block " + "visible length for Block: %s file %s from Datanode: %s\n%s", + b.toString().c_str(), path.c_str(), + nodes[i].formatAddress().c_str(), GetExceptionDetail(e)); + LOG(INFO, + "InputStreamImpl: retry get block visible length for Block: " + "%s file %s from other datanode", + b.toString().c_str(), path.c_str()); + --replicaNotFoundCount; + } catch (const HdfsIOException &e) { + LOG(LOG_ERROR, + "InputStreamImpl: failed to get block visible length for " + "Block: %s file %s from Datanode: %s\n%s", + b.toString().c_str(), path.c_str(), + nodes[i].formatAddress().c_str(), GetExceptionDetail(e)); + LOG(INFO, + "InputStreamImpl: retry get block visible length for Block: " + "%s file %s from other datanode", + b.toString().c_str(), path.c_str()); + } + } + + // Namenode told us about these locations, but none know about the replica + // means that we hit the race between pipeline creation start and end. + // we require all 3 because some other exception could have happened + // on a DN that has it. we want to report that error + if (replicaNotFoundCount == 0) { + return 0; + } + + return -1; +} + +/** + * Getting blocks locations'information from namenode + */ +void InputStreamImpl::updateBlockInfos() { + int retry = maxGetBlockInfoRetry; + + for (int i = 0; i < retry; ++i) { + try { + if (!lbs) { + lbs = shared_ptr<LocatedBlocks>(new LocatedBlocks); + } + + filesystem->getBlockLocations(path, cursor, prefetchSize, *lbs); + + if (lbs->isLastBlockComplete()) { + lastBlockBeingWrittenLength = 0; + } else { + shared_ptr<LocatedBlock> last = lbs->getLastBlock(); + + if (!last) { + lastBlockBeingWrittenLength = 0; + } else { + lastBlockBeingWrittenLength = readBlockLength(*last); + + if (lastBlockBeingWrittenLength == -1) { + if (i + 1 >= retry) { + THROW(HdfsIOException, + "InputStreamImpl: failed " + "to get block visible length for Block: " + "%s from all Datanode.", + last->toString().c_str()); + } else { + LOG(LOG_ERROR, + "InputStreamImpl: failed to get block visible " + "length for Block: %s file %s from all " + "Datanode.", + last->toString().c_str(), path.c_str()); + + try { + sleep_for(milliseconds(4000)); + } catch (...) { + } + + continue; + } + } + + last->setNumBytes(lastBlockBeingWrittenLength); + } + } + + return; + } catch (const HdfsRpcException &e) { + LOG(LOG_ERROR, + "InputStreamImpl: failed to get block information " + "for file %s, %s", + path.c_str(), GetExceptionDetail(e)); + + if (i + 1 >= retry) { + throw; + } + } + + LOG(INFO, + "InputStreamImpl: retry to get block information for " + "file: %s, already tried %d time(s).", + path.c_str(), i + 1); + } +} + +int64_t InputStreamImpl::getFileLength() { + int64_t length = lbs->getFileLength(); + + if (!lbs->isLastBlockComplete()) { + length += lastBlockBeingWrittenLength; + } + + return length; +} + +void InputStreamImpl::seekToBlock(const LocatedBlock &lb) { + if (cursor >= lbs->getFileLength()) { + assert(!lbs->isLastBlockComplete()); + readFromUnderConstructedBlock = true; + } else { + readFromUnderConstructedBlock = false; + } + + assert(cursor >= lb.getOffset() && + cursor < lb.getOffset() + lb.getNumBytes()); + curBlock = shared_ptr<LocatedBlock>(new LocatedBlock(lb)); + int64_t blockSize = curBlock->getNumBytes(); + assert(blockSize > 0); + endOfCurBlock = blockSize + curBlock->getOffset(); + failedNodes.clear(); + blockReader.reset(); +} + +bool InputStreamImpl::choseBestNode() { + const std::vector<DatanodeInfo> &nodes = curBlock->getLocations(); + + for (size_t i = 0; i < nodes.size(); ++i) { + if (std::binary_search(failedNodes.begin(), failedNodes.end(), + nodes[i])) { + continue; + } + + curNode = nodes[i]; + return true; + } + + return false; +} + +bool InputStreamImpl::isLocalNode() { + static const unordered_set<std::string> LocalAddrSet = BuildLocalAddrSet(); + bool retval = LocalAddrSet.find(curNode.getIpAddr()) != LocalAddrSet.end(); + return retval; +} + +BlockLocalPathInfo InputStreamImpl::getBlockLocalPathInfo( + LocalBlockInforCacheType &cache, const LocatedBlock &b) { + BlockLocalPathInfo retval; + + try { + if (!cache.find(LocalBlockInforCacheKey(b.getBlockId(), b.getPoolId()), + retval)) { + RpcAuth a = auth; + /* + * only kerberos based authentication is allowed, do not add token + */ + shared_ptr<Datanode> dn = shared_ptr<Datanode>(new DatanodeImpl( + curNode.getIpAddr().c_str(), curNode.getIpcPort(), *conf, a)); + dn->getBlockLocalPathInfo(b, b.getToken(), retval); + cache.insert(LocalBlockInforCacheKey(b.getBlockId(), b.getPoolId()), + retval); + } + } catch (const HdfsIOException &e) { + throw; + } catch (const HdfsException &e) { + NESTED_THROW( + HdfsIOException, + "InputStreamImpl: Failed to get block local path information."); + } + + return retval; +} + +void InputStreamImpl::invalidCacheEntry(LocalBlockInforCacheType &cache, + const LocatedBlock &b) { + cache.erase(LocalBlockInforCacheKey(b.getBlockId(), b.getPoolId())); +} + +LocalBlockInforCacheType &InputStreamImpl::getBlockLocalPathInfoCache( + uint32_t port) { + lock_guard<mutex> lock(MutLocalBlockInforCache); + unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType>>::iterator it; + it = LocalBlockInforCache.find(port); + + if (it == LocalBlockInforCache.end()) { + shared_ptr<LocalBlockInforCacheType> retval; + retval = + shared_ptr<LocalBlockInforCacheType>(new LocalBlockInforCacheType( + conf->getMaxLocalBlockInfoCacheSize())); + LocalBlockInforCache[port] = retval; + return *retval; + } else { + return *(it->second); + } +} + +void InputStreamImpl::setupBlockReader(bool temporaryDisableLocalRead) { + bool lastReadFromLocal = false; + exception_ptr lastException; + + while (true) { + if (!choseBestNode()) { + try { + if (lastException) { + rethrow_exception(lastException); + } + } catch (...) { + NESTED_THROW( + HdfsIOException, + "InputStreamImpl: all nodes have been tried and no valid " + "replica can be read for Block: %s.", + curBlock->toString().c_str()); + } + + THROW(HdfsIOException, + "InputStreamImpl: all nodes have been tried and no valid " + "replica can be read for Block: %s.", + curBlock->toString().c_str()); + } + + try { + int64_t offset, len; + offset = cursor - curBlock->getOffset(); + assert(offset >= 0); + len = curBlock->getNumBytes() - offset; + assert(len > 0); + + if (auth.getProtocol() == AuthProtocol::NONE && + !temporaryDisableLocalRead && !lastReadFromLocal && + !readFromUnderConstructedBlock && localRead && isLocalNode()) { + lastReadFromLocal = true; + LocalBlockInforCacheType &cache = + getBlockLocalPathInfoCache(curNode.getXferPort()); + BlockLocalPathInfo info = + getBlockLocalPathInfo(cache, *curBlock); + assert(curBlock->getBlockId() == info.getBlock().getBlockId() && + curBlock->getPoolId() == info.getBlock().getPoolId()); + LOG(DEBUG2, + "%p setup local block reader for file %s from " + "local block %s, block offset %" PRId64 + ", read block " + "length %" PRId64 " end of Block %" PRId64 + ", local " + "block file path %s", + this, path.c_str(), curBlock->toString().c_str(), offset, + len, offset + len, info.getLocalBlockPath()); + + if (0 != access(info.getLocalMetaPath(), R_OK)) { + invalidCacheEntry(cache, *curBlock); + continue; + } + + try { + blockReader = shared_ptr<BlockReader>( + new LocalBlockReader(info, *curBlock, offset, verify, + *conf, localReaderBuffer)); + } catch (...) { + invalidCacheEntry(cache, *curBlock); + throw; + } + } else { + const char *clientName; + LOG(DEBUG2, + "%p setup remote block reader for file %s from " + "remote block %s, block offset %" PRId64 + "" + ", read block length %" PRId64 " end of block %" PRId64 + ", from node %s", + this, path.c_str(), curBlock->toString().c_str(), offset, + len, offset + len, curNode.formatAddress().c_str()); + clientName = filesystem->getClientName(); + lastReadFromLocal = false; + blockReader = shared_ptr<BlockReader>(new RemoteBlockReader( + *curBlock, curNode, offset, len, curBlock->getToken(), + clientName, verify, *conf)); + } + + break; + } catch (const HdfsIOException &e) { + lastException = current_exception(); + + if (lastReadFromLocal) { + LOG(LOG_ERROR, + "cannot setup block reader for Block: %s file %s " + "on Datanode: %s.\n%s\n" + "retry the same node but disable reading from local block", + curBlock->toString().c_str(), path.c_str(), + curNode.formatAddress().c_str(), GetExceptionDetail(e)); + /* + * do not add node into failedNodes since we will retry the same + * node + * but + * disable local block reading + */ + } else { + LOG(LOG_ERROR, + "cannot setup block reader for Block: %s file %s on " + "Datanode: %s.\n%s\nretry another node", + curBlock->toString().c_str(), path.c_str(), + curNode.formatAddress().c_str(), GetExceptionDetail(e)); + failedNodes.push_back(curNode); + std::sort(failedNodes.begin(), failedNodes.end()); + } + } + } +} + +void InputStreamImpl::open(shared_ptr<FileSystemImpl> fs, const char *path, + bool verifyChecksum) { + if (NULL == path || 0 == strlen(path)) { + THROW(InvalidParameter, "path is invalid."); + } + + try { + openInternal(fs, path, verifyChecksum); + } catch (...) { + close(); + throw; + } +} + +void InputStreamImpl::openInternal(shared_ptr<FileSystemImpl> fs, + const char *path, bool verifyChecksum) { + try { + filesystem = fs; + verify = verifyChecksum; + this->path = fs->getStandardPath(path); + LOG(DEBUG2, "%p, open file %s for read, verfyChecksum is %s", this, + this->path.c_str(), (verifyChecksum ? "true" : "false")); + conf = shared_ptr<SessionConfig>(new SessionConfig(fs->getConf())); + this->auth = RpcAuth(fs->getUserInfo(), + RpcAuth::ParseMethod(conf->getRpcAuthMethod())); + prefetchSize = conf->getDefaultBlockSize() * conf->getPrefetchSize(); + localRead = conf->isReadFromLocal(); + maxGetBlockInfoRetry = conf->getMaxGetBlockInfoRetry(); + updateBlockInfos(); + closed = false; + } catch (const HdfsCanceled &e) { + throw; + } catch (const FileNotFoundException &e) { + throw; + } catch (const HdfsException &e) { + NESTED_THROW(HdfsIOException, "InputStreamImpl: cannot open file: %s.", + this->path.c_str()); + } +} + +int32_t InputStreamImpl::read(char *buf, int32_t size) { + checkStatus(); + + try { + int64_t prvious = cursor; + int32_t done = readInternal(buf, size); + LOG(DEBUG3, "%p read file %s size is %d, offset %" PRId64 + " done %d, " + "next pos %" PRId64, + this, path.c_str(), size, prvious, done, cursor); + return done; + } catch (const HdfsEndOfStream &e) { + throw; + } catch (...) { + lastError = current_exception(); + throw; + } +} + +int32_t InputStreamImpl::readOneBlock(char *buf, int32_t size, + bool shouldUpdateMetadataOnFailure) { + bool temporaryDisableLocalRead = false; + + while (true) { + try { + /* + * Setup block reader here and handle failure. + */ + if (!blockReader) { + setupBlockReader(temporaryDisableLocalRead); + temporaryDisableLocalRead = false; + } + } catch (const HdfsInvalidBlockToken &e) { + LOG(LOG_ERROR, + "InputStreamImpl: failed to read Block: %s file %s, \n%s, " + "retry after updating block informations.", + curBlock->toString().c_str(), path.c_str(), + GetExceptionDetail(e)); + return -1; + } catch (const HdfsIOException &e) { + /* + * In setupBlockReader, we have tried all the replicas. + * We now update block informations once, and try again. + */ + if (shouldUpdateMetadataOnFailure) { + LOG(LOG_ERROR, + "InputStreamImpl: failed to read Block: %s file %s, \n%s, " + "retry after updating block informations.", + curBlock->toString().c_str(), path.c_str(), + GetExceptionDetail(e)); + return -1; + } else { + /* + * We have updated block informations and failed again. + */ + throw; + } + } + + /* + * Block reader has been setup, read from block reader. + */ + try { + int32_t todo = size; + todo = todo < endOfCurBlock - cursor + ? todo + : static_cast<int32_t>(endOfCurBlock - cursor); + assert(blockReader); + todo = blockReader->read(buf, todo); + cursor += todo; + /* + * Exit the loop and function from here if success. + */ + return todo; + } catch (const HdfsIOException &e) { + /* + * Failed to read from current block reader, + * add the current datanode to invalid node list and try again. + */ + LOG(LOG_ERROR, + "InputStreamImpl: failed to read Block: %s file %s from " + "Datanode: %s, \n%s, " + "retry read again from another Datanode.", + curBlock->toString().c_str(), path.c_str(), + curNode.formatAddress().c_str(), GetExceptionDetail(e)); + + if (conf->doesNotRetryAnotherNode()) { + throw; + } + } catch (const ChecksumException &e) { + LOG(LOG_ERROR, + "InputStreamImpl: failed to read Block: %s file %s " + "from Datanode: %s, \n%s, retry read again from " + "another Datanode.", + curBlock->toString().c_str(), path.c_str(), + curNode.formatAddress().c_str(), GetExceptionDetail(e)); + } + + /* + * Successfully create the block reader but failed to read. + * Disable the local block reader and try the same node again. + */ + if (!blockReader || + dynamic_cast<LocalBlockReader *>(blockReader.get())) { + temporaryDisableLocalRead = true; + } else { + /* + * Remote block reader failed to read, try another node. + */ + LOG(INFO, + "IntputStreamImpl: Add invalid datanode %s to failed " + "datanodes and try another datanode again for file %s.", + curNode.formatAddress().c_str(), path.c_str()); + failedNodes.push_back(curNode); + std::sort(failedNodes.begin(), failedNodes.end()); + } + + blockReader.reset(); + } +} + +/** + * To read data from hdfs. + * @param buf the buffer used to filled. + * @param size buffer size. + * @return return the number of bytes filled in the buffer, it may less than + * size. + */ +int32_t InputStreamImpl::readInternal(char *buf, int32_t size) { + int updateMetadataOnFailure = conf->getMaxReadBlockRetry(); + + try { + do { + const LocatedBlock *lb = NULL; + + /* + * Check if we have got the block information we need. + */ + if (!lbs || cursor >= getFileLength() || + (cursor >= endOfCurBlock && !(lb = lbs->findBlock(cursor)))) { + /* + * Get block information from namenode. + * Do RPC failover work in updateBlockInfos. + */ + updateBlockInfos(); + + /* + * We already have the up-to-date block information, + * Check if we reach the end of file. + */ + if (cursor >= getFileLength()) { + THROW(HdfsEndOfStream, + "InputStreamImpl: read over EOF, current position: " + "%" PRId64 ", read size: %d, from file: %s", + cursor, size, path.c_str()); + } + } + + /* + * If we reach the end of block or the block information has just + * updated, + * seek to the right block to read. + */ + if (cursor >= endOfCurBlock) { + lb = lbs->findBlock(cursor); + + if (!lb) { + THROW(HdfsIOException, + "InputStreamImpl: cannot find block information at " + "position: %" PRId64 " for file: %s", + cursor, path.c_str()); + } + + /* + * Seek to the right block, setup all needed variable, + * but do not setup block reader, setup it latter. + */ + seekToBlock(*lb); + } + + int32_t retval = + readOneBlock(buf, size, updateMetadataOnFailure > 0); + + /* + * Now we have tried all replicas and failed. + * We will update metadata once and try again. + */ + if (retval < 0) { + lbs.reset(); + endOfCurBlock = 0; + --updateMetadataOnFailure; + + try { + sleep_for(seconds(1)); + } catch (...) { + } + + continue; + } + + return retval; + } while (true); + } catch (const HdfsCanceled &e) { + throw; + } catch (const HdfsEndOfStream &e) { + throw; + } catch (const HdfsException &e) { + /* + * wrap the underlying error and rethrow. + */ + NESTED_THROW(HdfsIOException, + "InputStreamImpl: cannot read file: %s, from " + "position %" PRId64 ", size: %d.", + path.c_str(), cursor, size); + } +} + +/** + * To read data from hdfs, block until get the given size of bytes. + * @param buf the buffer used to filled. + * @param size the number of bytes to be read. + */ +void InputStreamImpl::readFully(char *buf, int64_t size) { + LOG(DEBUG3, "readFully file %s size is %" PRId64 ", offset %" PRId64, + path.c_str(), size, cursor); + checkStatus(); + + try { + return readFullyInternal(buf, size); + } catch (const HdfsEndOfStream &e) { + throw; + } catch (...) { + lastError = current_exception(); + throw; + } +} + +void InputStreamImpl::readFullyInternal(char *buf, int64_t size) { + int32_t done; + int64_t pos = cursor, todo = size; + + try { + while (todo > 0) { + done = todo < std::numeric_limits<int32_t>::max() + ? static_cast<int32_t>(todo) + : std::numeric_limits<int32_t>::max(); + done = readInternal(buf + (size - todo), done); + todo -= done; + } + } catch (const HdfsCanceled &e) { + throw; + } catch (const HdfsEndOfStream &e) { + THROW(HdfsEndOfStream, + "InputStreamImpl: read over EOF, current position: %" PRId64 + ", read size: %" PRId64 ", from file: %s", + pos, size, path.c_str()); + } catch (const HdfsException &e) { + NESTED_THROW(HdfsIOException, + "InputStreamImpl: cannot read fully from file: %s, " + "from position %" PRId64 ", size: %" PRId64 ".", + path.c_str(), pos, size); + } +} + +int64_t InputStreamImpl::available() { + checkStatus(); + + try { + if (blockReader) { + return blockReader->available(); + } + } catch (...) { + lastError = current_exception(); + throw; + } + + return 0; +} + +/** + * To move the file point to the given position. + * @param size the given position. + */ +void InputStreamImpl::seek(int64_t pos) { + LOG(DEBUG2, "%p seek file %s to %" PRId64 ", offset %" PRId64, this, + path.c_str(), pos, cursor); + checkStatus(); + + try { + seekInternal(pos); + } catch (...) { + lastError = current_exception(); + throw; + } +} + +void InputStreamImpl::seekInternal(int64_t pos) { + if (cursor == pos) { + return; + } + + if (!lbs || pos > getFileLength()) { + updateBlockInfos(); + + if (pos > getFileLength()) { + THROW(HdfsEndOfStream, + "InputStreamImpl: seek over EOF, current position: %" PRId64 + ", seek target: %" PRId64 ", in file: %s", + cursor, pos, path.c_str()); + } + } + + try { + if (blockReader && pos > cursor && pos < endOfCurBlock) { + blockReader->skip(pos - cursor); + cursor = pos; + return; + } + } catch (const HdfsIOException &e) { + LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 + " bytes in current block reader for file %s\n%s", + pos - cursor, path.c_str(), GetExceptionDetail(e)); + LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 + " for file %s", + pos, path.c_str()); + } catch (const ChecksumException &e) { + LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 + " bytes in current block reader for file %s\n%s", + pos - cursor, path.c_str(), GetExceptionDetail(e)); + LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 + " for file %s", + pos, path.c_str()); + } + + /** + * the seek target exceed the current block or skip failed in current block + * reader. + * reset current block reader and set the cursor to the target position to + * seek. + */ + endOfCurBlock = 0; + blockReader.reset(); + cursor = pos; +} + +/** + * To get the current file point position. + * @return the position of current file point. + */ +int64_t InputStreamImpl::tell() { + checkStatus(); + LOG(DEBUG2, "tell file %s at %" PRId64, path.c_str(), cursor); + return cursor; +} + +/** + * Close the stream. + */ +void InputStreamImpl::close() { + LOG(DEBUG2, "%p close file %s for read", this, path.c_str()); + closed = true; + localRead = true; + readFromUnderConstructedBlock = false; + verify = true; + filesystem.reset(); + cursor = 0; + endOfCurBlock = 0; + lastBlockBeingWrittenLength = 0; + prefetchSize = 0; + blockReader.reset(); + curBlock.reset(); + lbs.reset(); + conf.reset(); + failedNodes.clear(); + path.clear(); + localReaderBuffer.resize(0); + lastError = exception_ptr(); +} + +std::string InputStreamImpl::toString() { + if (path.empty()) { + return std::string("InputStream for path ") + path; + } else { + return std::string("InputStream (not opened)"); + } +} +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h index 8723344..5202a33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h @@ -19,13 +19,10 @@ #ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_ #define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_ -#include "platform.h" - #include "BlockReader.h" #include "ExceptionInternal.h" #include "FileSystem.h" #include "Hash.h" -#include "InputStreamInter.h" #include "LruMap.h" #include "SessionConfig.h" #include "SharedPtr.h" @@ -45,14 +42,16 @@ namespace hdfs { namespace internal { typedef std::pair<int64_t, std::string> LocalBlockInforCacheKey; -typedef LruMap<LocalBlockInforCacheKey, BlockLocalPathInfo> LocalBlockInforCacheType; +typedef LruMap<LocalBlockInforCacheKey, BlockLocalPathInfo> + LocalBlockInforCacheType; /** * A input stream used read data from hdfs. */ -class InputStreamImpl: public InputStreamInter { +class InputStreamImpl { public: InputStreamImpl(); + ~InputStreamImpl(); /** @@ -61,22 +60,24 @@ public: * @param path the file to be read. * @param verifyChecksum verify the checksum. */ - void open(shared_ptr<FileSystemInter> fs, const char * path, bool verifyChecksum); + void open(shared_ptr<FileSystemImpl> fs, const char *path, + bool verifyChecksum); /** * To read data from hdfs. * @param buf the buffer used to filled. * @param size buffer size. - * @return return the number of bytes filled in the buffer, it may less than size. + * @return return the number of bytes filled in the buffer, it may less than + * size. */ - int32_t read(char * buf, int32_t size); + int32_t read(char *buf, int32_t size); /** * To read data from hdfs, block until get the given size of bytes. * @param buf the buffer used to filled. * @param size the number of bytes to be read. */ - void readFully(char * buf, int64_t size); + void readFully(char *buf, int64_t size); int64_t available(); @@ -100,23 +101,24 @@ public: std::string toString(); private: - BlockLocalPathInfo getBlockLocalPathInfo(LocalBlockInforCacheType & cache, - const LocatedBlock & b); + BlockLocalPathInfo getBlockLocalPathInfo(LocalBlockInforCacheType &cache, + const LocatedBlock &b); bool choseBestNode(); bool isLocalNode(); - int32_t readInternal(char * buf, int32_t size); - int32_t readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure); + int32_t readInternal(char *buf, int32_t size); + int32_t readOneBlock(char *buf, int32_t size, + bool shouldUpdateMetadataOnFailure); int64_t getFileLength(); - int64_t readBlockLength(const LocatedBlock & b); - LocalBlockInforCacheType & getBlockLocalPathInfoCache(uint32_t port); + int64_t readBlockLength(const LocatedBlock &b); + LocalBlockInforCacheType &getBlockLocalPathInfoCache(uint32_t port); void checkStatus(); - void invalidCacheEntry(LocalBlockInforCacheType & cache, - const LocatedBlock & b); - void openInternal(shared_ptr<FileSystemInter> fs, const char * path, + void invalidCacheEntry(LocalBlockInforCacheType &cache, + const LocatedBlock &b); + void openInternal(shared_ptr<FileSystemImpl> fs, const char *path, bool verifyChecksum); - void readFullyInternal(char * buf, int64_t size); + void readFullyInternal(char *buf, int64_t size); void seekInternal(int64_t pos); - void seekToBlock(const LocatedBlock & lb); + void seekToBlock(const LocatedBlock &lb); void setupBlockReader(bool temporaryDisableLocalRead); void updateBlockInfos(); @@ -135,7 +137,7 @@ private: int64_t prefetchSize; RpcAuth auth; shared_ptr<BlockReader> blockReader; - shared_ptr<FileSystemInter> filesystem; + shared_ptr<FileSystemImpl> filesystem; shared_ptr<LocatedBlock> curBlock; shared_ptr<LocatedBlocks> lbs; shared_ptr<SessionConfig> conf; @@ -144,28 +146,31 @@ private: std::vector<char> localReaderBuffer; static mutex MutLocalBlockInforCache; - static unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType> > LocalBlockInforCache; -#ifdef MOCK + static unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType>> + LocalBlockInforCache; + private: - hdfs::mock::TestDatanodeStub * stub; + InputStreamImpl(const InputStreamImpl &other); + InputStreamImpl &operator=(const InputStreamImpl &other); + +#ifdef MOCK + hdfs::mock::TestDatanodeStub *stub; #endif }; - } } #ifdef NEED_BOOST namespace boost { -template<> +template <> struct hash<hdfs::internal::LocalBlockInforCacheKey> { std::size_t operator()( - const hdfs::internal::LocalBlockInforCacheKey & key) const { + const hdfs::internal::LocalBlockInforCacheKey &key) const { size_t values[] = {hdfs::internal::Int64Hasher(key.first), - hdfs::internal::StringHasher(key.second) - }; - return hdfs::internal::CombineHasher(values, - sizeof(values) / sizeof(values[0])); + hdfs::internal::StringHasher(key.second)}; + return hdfs::internal::CombineHasher( + values, sizeof(values) / sizeof(values[0])); } }; } @@ -173,15 +178,14 @@ struct hash<hdfs::internal::LocalBlockInforCacheKey> { #else namespace std { -template<> +template <> struct hash<hdfs::internal::LocalBlockInforCacheKey> { std::size_t operator()( - const hdfs::internal::LocalBlockInforCacheKey & key) const { - size_t values[] = { hdfs::internal::Int64Hasher(key.first), - hdfs::internal::StringHasher(key.second) - }; - return hdfs::internal::CombineHasher(values, - sizeof(values) / sizeof(values[0])); + const hdfs::internal::LocalBlockInforCacheKey &key) const { + size_t values[] = {hdfs::internal::Int64Hasher(key.first), + hdfs::internal::StringHasher(key.second)}; + return hdfs::internal::CombineHasher( + values, sizeof(values) / sizeof(values[0])); } }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h index 6118403..d26745d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h @@ -32,11 +32,11 @@ namespace hdfs { namespace internal { -class LocalBlockReader: public BlockReader { +class LocalBlockReader : public BlockReader { public: - LocalBlockReader(const BlockLocalPathInfo & info, - const ExtendedBlock & block, int64_t offset, bool verify, - SessionConfig & conf, std::vector<char> & buffer); + LocalBlockReader(const BlockLocalPathInfo &info, const ExtendedBlock &block, + int64_t offset, bool verify, SessionConfig &conf, + std::vector<char> &buffer); ~LocalBlockReader(); @@ -55,7 +55,7 @@ public: * @return return the number of bytes filled in the buffer, * it may less than size. Return 0 if reach the end of block. */ - virtual int32_t read(char * buf, int32_t size); + virtual int32_t read(char *buf, int32_t size); /** * Move the cursor forward len bytes. @@ -64,34 +64,36 @@ public: virtual void skip(int64_t len); private: + LocalBlockReader(const LocalBlockReader &other); + LocalBlockReader &operator=(const LocalBlockReader &other); + /** * Fill buffer and verify checksum. * @param bufferSize The size of buffer. */ void readAndVerify(int32_t bufferSize); - int32_t readInternal(char * buf, int32_t len); + int32_t readInternal(char *buf, int32_t len); private: - bool verify; //verify checksum or not. + bool verify; // verify checksum or not. const char *pbuffer; const char *pMetaBuffer; const ExtendedBlock █ int checksumSize; int chunkSize; int localBufferSize; - int position; //point in buffer. - int size; //data size in buffer. - int64_t cursor; //point in block. - int64_t length; //data size of block. + int position; // point in buffer. + int size; // data size in buffer. + int64_t cursor; // point in block. + int64_t length; // data size of block. shared_ptr<Checksum> checksum; shared_ptr<FileWrapper> dataFd; shared_ptr<FileWrapper> metaFd; std::string dataFilePath; std::string metaFilePath; - std::vector<char> & buffer; + std::vector<char> &buffer; std::vector<char> metaBuffer; }; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h index 7598344..dd69b8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h @@ -29,11 +29,10 @@ namespace internal { class ConstPacketBuffer { public: - ConstPacketBuffer(const char * buf, int size) : - buffer(buf), size(size) { + ConstPacketBuffer(const char *buf, int size) : buffer(buf), size(size) { } - const char * getBuffer() const { + const char *getBuffer() const { return buffer; } @@ -42,7 +41,7 @@ public: } private: - const char * buffer; + const char *buffer; const int size; }; @@ -65,13 +64,15 @@ public: /** * create a new packet */ - Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, int checksumSize); + Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, + int checksumSize); - void reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, int checksumSize); + void reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock, + int64_t seqno, int checksumSize); void addChecksum(uint32_t checksum); - void addData(const char * buf, int size); + void addData(const char *buf, int size); void setSyncFlag(bool sync); @@ -102,21 +103,20 @@ public: } private: - bool lastPacketInBlock; // is this the last packet in block - bool syncBlock; // sync block to disk? + bool lastPacketInBlock; // is this the last packet in block + bool syncBlock; // sync block to disk? int checksumPos; int checksumSize; int checksumStart; int dataPos; int dataStart; int headerStart; - int maxChunks; // max chunks in packet - int numChunks; // number of chunks currently in packet - int64_t offsetInBlock; // offset in block - int64_t seqno; // sequence number of packet in block + int maxChunks; // max chunks in packet + int numChunks; // number of chunks currently in packet + int64_t offsetInBlock; // offset in block + int64_t seqno; // sequence number of packet in block std::vector<char> buffer; }; - } } #endif /* _HDFS_LIBHDFS3_CLIENT_PACKET_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h index f8447b8..6d46a7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h @@ -28,6 +28,8 @@ namespace internal { class PacketHeader { public: + static int GetPkgHeaderSize(); + static int CalcPkgHeaderSize(); PacketHeader(); PacketHeader(int packetLen, int64_t offsetInBlock, int64_t seqno, bool lastPacketInBlock, int dataLen); @@ -37,24 +39,19 @@ public: int getPacketLen(); int64_t getOffsetInBlock(); int64_t getSeqno(); - void readFields(const char * buf, size_t size); + void readFields(const char *buf, size_t size); + /** * Write the header into the buffer. * This requires that PKT_HEADER_LEN bytes are available. */ - void writeInBuffer(char * buf, size_t size); - -public: - static int GetPkgHeaderSize(); - static int CalcPkgHeaderSize(); + void writeInBuffer(char *buf, size_t size); private: static int PkgHeaderSize; -private: int32_t packetLen; PacketHeaderProto proto; }; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h index 548118b..2a29acb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h @@ -35,7 +35,7 @@ namespace hdfs { namespace internal { -class RemoteBlockReader: public BlockReader { +class RemoteBlockReader : public BlockReader { public: RemoteBlockReader(const ExtendedBlock &eb, DatanodeInfo &datanode, int64_t start, int64_t len, const Token &token, @@ -65,6 +65,8 @@ public: virtual void skip(int64_t len); private: + RemoteBlockReader(const RemoteBlockReader &other); + RemoteBlockReader &operator=(RemoteBlockReader &other); bool readTrailingEmptyPacket(); shared_ptr<PacketHeader> readPacketHeader(); void checkResponse(); @@ -72,20 +74,19 @@ private: void sendStatus(); void verifyChecksum(int chunks); -private: - bool verify; //verify checksum or not. + bool verify; // verify checksum or not. DatanodeInfo &datanode; const ExtendedBlock &binfo; int checksumSize; int chunkSize; int connTimeout; - int position; //point in buffer. + int position; // point in buffer. int readTimeout; - int size; //data size in buffer. + int size; // data size in buffer. int writeTimeout; - int64_t cursor; //point in block. - int64_t endOffset; //offset in block requested to read to. - int64_t lastSeqNo; //segno of the last chunk received + int64_t cursor; // point in block. + int64_t endOffset; // offset in block requested to read to. + int64_t lastSeqNo; // segno of the last chunk received shared_ptr<BufferedSocketReader> in; shared_ptr<Checksum> checksum; shared_ptr<DataTransferProtocol> sender; @@ -93,7 +94,6 @@ private: shared_ptr<Socket> sock; std::vector<char> buffer; }; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h new file mode 100644 index 0000000..e00baa8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_ +#define _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_ + +#include "Hash.h" +#include "Token.h" + +HDFS_HASH_DEFINE(::hdfs::internal::Token); + +#endif /* _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc new file mode 100644 index 0000000..3e1e841 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"){} you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Config.h" +#include "ConfigImpl.h" +#include "XmlConfigParser.h" +#include "StatusInternal.h" + +using namespace hdfs::internal; + +namespace hdfs { + +Config Config::CreateFromXmlFile(const std::string &path) { + return Config(new ConfigImpl(XmlConfigParser(path.c_str()).getKeyValue())); +} + +Config::Config() : impl(new ConfigImpl) { +} + +Config::Config(const Config &other) { + impl = new ConfigImpl(*other.impl); +} + +Config::Config(ConfigImpl *impl) : impl(impl) { +} + +Config &Config::operator=(const Config &other) { + if (this == &other) { + return *this; + } + + ConfigImpl *temp = impl; + impl = new ConfigImpl(*other.impl); + delete temp; + return *this; +} + +bool Config::operator==(const Config &other) const { + if (this == &other) { + return true; + } + + return *impl == *other.impl; +} + +Config::~Config() { + delete impl; +} + +Status Config::getString(const std::string &key, std::string *output) const { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getString(key.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status Config::getString(const std::string &key, const std::string &def, + std::string *output) const { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getString(key.c_str(), def.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status Config::getInt64(const std::string &key, std::string *output) const { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getInt64(key.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status Config::getInt64(const std::string &key, int64_t def, + std::string *output) const { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getInt64(key.c_str(), def); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status Config::getInt32(const std::string &key, std::string *output) const { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getInt32(key.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status Config::getInt32(const std::string &key, int32_t def, + std::string *output) const { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getInt32(key.c_str(), def); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status Config::getDouble(const std::string &key, std::string *output) const { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getDouble(key.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status Config::getDouble(const std::string &key, double def, + std::string *output) const { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getDouble(key.c_str(), def); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status Config::getBool(const std::string &key, std::string *output) const { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getBool(key.c_str()); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +Status Config::getBool(const std::string &key, bool def, + std::string *output) const { + CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\""); + + try { + *output = impl->getBool(key.c_str(), def); + } catch (...) { + return CreateStatusFromException(current_exception()); + } + + return Status::OK(); +} + +void Config::set(const std::string &key, const std::string &value) { + impl->set(key.c_str(), value); +} + +void Config::set(const std::string &key, int32_t value) { + impl->set(key.c_str(), value); +} + +void Config::set(const std::string &key, int64_t value) { + impl->set(key.c_str(), value); +} + +void Config::set(const std::string &key, double value) { + impl->set(key.c_str(), value); +} + +void Config::set(const std::string &key, bool value) { + impl->set(key.c_str(), value); +} + +size_t Config::hash_value() const { + return impl->hash_value(); +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h new file mode 100644 index 0000000..4359e3f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _HDFS_LIBHDFS3_COMMON_CONFIG_H_ +#define _HDFS_LIBHDFS3_COMMON_CONFIG_H_ + +#include "Status.h" + +#include <stdint.h> +#include <string> +#include <map> + +namespace hdfs { + +class FileSystem; +class NamenodeInfo; + +namespace internal { +class ConfigImpl; +class FileSystemImpl; +} + +/** + * A configure file parser. + */ +class Config { +public: + /** + * Create an instance from a XML file + * @param path the path of the configure file. + */ + static Config CreateFromXmlFile(const std::string &path); + + /** + * Construct a empty Config instance. + */ + Config(); + + /** + * Copy constructor + */ + Config(const Config &other); + + /** + * Assignment operator. + */ + Config &operator=(const Config &other); + + /** + * Operator equal + */ + bool operator==(const Config &other) const; + + /** + * Destroy this instance + */ + ~Config(); + + /** + * Get a string with given configure key. + * @param key The key of the configure item. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getString(const std::string &key, std::string *output) const; + + /** + * Get a string with given configure key. + * Return the default value def if key is not found. + * @param key The key of the configure item. + * @param def The defalut value. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getString(const std::string &key, const std::string &def, + std::string *output) const; + + /** + * Get a 64 bit integer with given configure key. + * @param key The key of the configure item. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getInt64(const std::string &key, std::string *output) const; + + /** + * Get a 64 bit integer with given configure key. + * Return the default value def if key is not found. + * @param key The key of the configure item. + * @param def The defalut value. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getInt64(const std::string &key, int64_t def, + std::string *output) const; + + /** + * Get a 32 bit integer with given configure key. + * @param key The key of the configure item. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getInt32(const std::string &key, std::string *output) const; + + /** + * Get a 32 bit integer with given configure key. + * Return the default value def if key is not found. + * @param key The key of the configure item. + * @param def The defalut value. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getInt32(const std::string &key, int32_t def, + std::string *output) const; + + /** + * Get a double with given configure key. + * @param key The key of the configure item. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getDouble(const std::string &key, std::string *output) const; + + /** + * Get a double with given configure key. + * Return the default value def if key is not found. + * @param key The key of the configure item. + * @param def The defalut value. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getDouble(const std::string &key, double def, + std::string *output) const; + + /** + * Get a boolean with given configure key. + * @param key The key of the configure item. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getBool(const std::string &key, std::string *output) const; + + /** + * Get a boolean with given configure key. + * Return the default value def if key is not found. + * @param key The key of the configure item. + * @param def The default value. + * @param output the pointer of the output parameter. + * @return the result status of this operation + */ + Status getBool(const std::string &key, bool def, std::string *output) const; + + /** + * Set a configure item + * @param key The key will set. + * @param value The value will be set to. + */ + void set(const std::string &key, const std::string &value); + + /** + * Set a configure item + * @param key The key will set. + * @param value The value will be set to. + */ + void set(const std::string &key, int32_t value); + + /** + * Set a configure item + * @param key The key will set. + * @param value The value will be set to. + */ + void set(const std::string &key, int64_t value); + + /** + * Set a configure item + * @param key The key will set. + * @param value The value will be set to. + */ + void set(const std::string &key, double value); + + /** + * Set a configure item + * @param key The key will set. + * @param value The value will be set to. + */ + void set(const std::string &key, bool value); + + /** + * Get the hash value of this object + * @return The hash value + */ + size_t hash_value() const; + +private: + Config(hdfs::internal::ConfigImpl *impl); + hdfs::internal::ConfigImpl *impl; + friend class hdfs::FileSystem; + friend class hdfs::internal::FileSystemImpl; + friend class hdfs::NamenodeInfo; +}; +} + +#endif /* _HDFS_LIBHDFS3_COMMON_CONFIG_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc new file mode 100644 index 0000000..7bfb3ec --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Exception.h" +#include "ExceptionInternal.h" +#include "Hash.h" +#include "ConfigImpl.h" + +#include <cassert> +#include <errno.h> +#include <fstream> +#include <limits> +#include <string.h> +#include <unistd.h> +#include <vector> + +using std::map; +using std::string; +using std::vector; + +namespace hdfs { +namespace internal { + +typedef map<string, string>::const_iterator Iterator; +typedef map<string, string> Map; + +static int32_t StrToInt32(const char *str) { + long retval; + char *end = NULL; + errno = 0; + retval = strtol(str, &end, 0); + + if (EINVAL == errno || 0 != *end) { + THROW(HdfsBadNumFoumat, "Invalid int32_t type: %s", str); + } + + if (ERANGE == errno || retval > std::numeric_limits<int32_t>::max() || + retval < std::numeric_limits<int32_t>::min()) { + THROW(HdfsBadNumFoumat, "Underflow/Overflow int32_t type: %s", str); + } + + return retval; +} + +static int64_t StrToInt64(const char *str) { + long long retval; + char *end = NULL; + errno = 0; + retval = strtoll(str, &end, 0); + + if (EINVAL == errno || 0 != *end) { + THROW(HdfsBadNumFoumat, "Invalid int64_t type: %s", str); + } + + if (ERANGE == errno || retval > std::numeric_limits<int64_t>::max() || + retval < std::numeric_limits<int64_t>::min()) { + THROW(HdfsBadNumFoumat, "Underflow/Overflow int64_t type: %s", str); + } + + return retval; +} + +static bool StrToBool(const char *str) { + bool retval = false; + + if (!strcasecmp(str, "true") || !strcmp(str, "1")) { + retval = true; + } else if (!strcasecmp(str, "false") || !strcmp(str, "0")) { + retval = false; + } else { + THROW(HdfsBadBoolFoumat, "Invalid bool type: %s", str); + } + + return retval; +} + +static double StrToDouble(const char *str) { + double retval; + char *end = NULL; + errno = 0; + retval = strtod(str, &end); + + if (EINVAL == errno || 0 != *end) { + THROW(HdfsBadNumFoumat, "Invalid double type: %s", str); + } + + if (ERANGE == errno || retval > std::numeric_limits<double>::max() || + retval < std::numeric_limits<double>::min()) { + THROW(HdfsBadNumFoumat, "Underflow/Overflow int64_t type: %s", str); + } + + return retval; +} + +ConfigImpl::ConfigImpl(const Map &kv) : kv(kv) { +} + +const char *ConfigImpl::getString(const std::string &key) const { + Iterator it = kv.find(key.c_str()); + + if (kv.end() == it) { + THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str()); + } + + return it->second.c_str(); +} + +const char *ConfigImpl::getString(const std::string &key, + const std::string &def) const { + Iterator it = kv.find(key.c_str()); + + if (kv.end() == it) { + return def.c_str(); + } else { + return it->second.c_str(); + } +} + +int64_t ConfigImpl::getInt64(const std::string &key) const { + int64_t retval; + Iterator it = kv.find(key.c_str()); + + if (kv.end() == it) { + THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str()); + } + + try { + retval = StrToInt64(it->second.c_str()); + } catch (const HdfsBadNumFoumat &e) { + NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", + key.c_str()); + } + + return retval; +} + +int64_t ConfigImpl::getInt64(const std::string &key, int64_t def) const { + int64_t retval; + Iterator it = kv.find(key.c_str()); + + if (kv.end() == it) { + return def; + } + + try { + retval = StrToInt64(it->second.c_str()); + } catch (const HdfsBadNumFoumat &e) { + NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", + key.c_str()); + } + + return retval; +} + +int32_t ConfigImpl::getInt32(const std::string &key) const { + int32_t retval; + Iterator it = kv.find(key.c_str()); + + if (kv.end() == it) { + THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str()); + } + + try { + retval = StrToInt32(it->second.c_str()); + } catch (const HdfsBadNumFoumat &e) { + NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", + key.c_str()); + } + + return retval; +} + +int32_t ConfigImpl::getInt32(const std::string &key, int32_t def) const { + int32_t retval; + Iterator it = kv.find(key.c_str()); + + if (kv.end() == it) { + return def; + } + + try { + retval = StrToInt32(it->second.c_str()); + } catch (const HdfsBadNumFoumat &e) { + NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", + key.c_str()); + } + + return retval; +} + +double ConfigImpl::getDouble(const std::string &key) const { + double retval; + Iterator it = kv.find(key.c_str()); + + if (kv.end() == it) { + THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str()); + } + + try { + retval = StrToDouble(it->second.c_str()); + } catch (const HdfsBadNumFoumat &e) { + NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", + key.c_str()); + } + + return retval; +} + +double ConfigImpl::getDouble(const std::string &key, double def) const { + double retval; + Iterator it = kv.find(key.c_str()); + + if (kv.end() == it) { + return def; + } + + try { + retval = StrToDouble(it->second.c_str()); + } catch (const HdfsBadNumFoumat &e) { + NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", + key.c_str()); + } + + return retval; +} + +bool ConfigImpl::getBool(const std::string &key) const { + bool retval; + Iterator it = kv.find(key.c_str()); + + if (kv.end() == it) { + THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str()); + } + + try { + retval = StrToBool(it->second.c_str()); + } catch (const HdfsBadBoolFoumat &e) { + NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", + key.c_str()); + } + + return retval; +} + +bool ConfigImpl::getBool(const std::string &key, bool def) const { + bool retval; + Iterator it = kv.find(key.c_str()); + + if (kv.end() == it) { + return def; + } + + try { + retval = StrToBool(it->second.c_str()); + } catch (const HdfsBadNumFoumat &e) { + NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", + key.c_str()); + } + + return retval; +} + +size_t ConfigImpl::hash_value() const { + vector<size_t> values; + map<string, string>::const_iterator s, e; + e = kv.end(); + + for (s = kv.begin(); s != e; ++s) { + values.push_back(StringHasher(s->first)); + values.push_back(StringHasher(s->second)); + } + + return CombineHasher(&values[0], values.size()); +} +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h new file mode 100644 index 0000000..5d058dd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _HDFS_LIBHDFS3_COMMON_XMLCONFIGIMPL_H_ +#define _HDFS_LIBHDFS3_COMMON_XMLCONFIGIMPL_H_ + +#include <stdint.h> +#include <string> +#include <sstream> +#include <map> + +namespace hdfs { +namespace internal { + +/** + * A configure file parser. + */ +class ConfigImpl { +public: + ConfigImpl() { + } + + /** + * Construct a empty Config instance. + */ + ConfigImpl(const std::map<std::string, std::string> &kv); + + /** + * Operator equal + */ + bool operator==(const ConfigImpl &other) const { + if (this == &other) { + return true; + } + + return this->kv == other.kv; + } + + /** + * Get a string with given configure key. + * @param key The key of the configure item. + * @return The value of configure item. + * @throw HdfsConfigNotFound + */ + const char *getString(const std::string &key) const; + + /** + * Get a string with given configure key. + * Return the default value def if key is not found. + * @param key The key of the configure item. + * @param def The defalut value. + * @return The value of configure item. + */ + const char *getString(const std::string &key, const std::string &def) const; + + /** + * Get a 64 bit integer with given configure key. + * @param key The key of the configure item. + * @return The value of configure item. + * @throw HdfsConfigNotFound + */ + int64_t getInt64(const std::string &key) const; + + /** + * Get a 64 bit integer with given configure key. + * Return the default value def if key is not found. + * @param key The key of the configure item. + * @param def The defalut value. + * @return The value of configure item. + */ + int64_t getInt64(const std::string &key, int64_t def) const; + + /** + * Get a 32 bit integer with given configure key. + * @param key The key of the configure item. + * @return The value of configure item. + * @throw HdfsConfigNotFound + */ + int32_t getInt32(const std::string &key) const; + + /** + * Get a 32 bit integer with given configure key. + * Return the default value def if key is not found. + * @param key The key of the configure item. + * @param def The defalut value. + * @return The value of configure item. + */ + int32_t getInt32(const std::string &key, int32_t def) const; + + /** + * Get a double with given configure key. + * @param key The key of the configure item. + * @return The value of configure item. + * @throw HdfsConfigNotFound + */ + double getDouble(const std::string &key) const; + + /** + * Get a double with given configure key. + * Return the default value def if key is not found. + * @param key The key of the configure item. + * @param def The defalut value. + * @return The value of configure item. + */ + double getDouble(const std::string &key, double def) const; + + /** + * Get a boolean with given configure key. + * @param key The key of the configure item. + * @return The value of configure item. + * @throw HdfsConfigNotFound + */ + bool getBool(const std::string &key) const; + + /** + * Get a boolean with given configure key. + * Return the default value def if key is not found. + * @param key The key of the configure item. + * @param def The default value. + * @return The value of configure item. + */ + bool getBool(const std::string &key, bool def) const; + + /** + * Set a configure item + * @param key The key will set. + * @param value The value will be set to. + */ + template <typename T> + void set(const std::string &key, T const &value) { + std::stringstream ss; + ss << value; + kv[key] = ss.str(); + } + + /** + * Get the hash value of this object + * + * @return The hash value + */ + size_t hash_value() const; + +private: + std::string path; + std::map<std::string, std::string> kv; +}; +} +} + +#endif /* _HDFS_LIBHDFS3_COMMON_XMLCONFIGIMPL_H_ */
