http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h deleted file mode 100644 index d30322c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_ -#define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_ - -#include "common/util.h" - -#include <asio/read.hpp> - -#include <google/protobuf/message_lite.h> -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> - -#include <cassert> - -namespace hdfs { -namespace continuation { - -template <class Stream, size_t MaxMessageSize = 512> -struct ReadDelimitedPBMessageContinuation : public Continuation { - ReadDelimitedPBMessageContinuation(Stream *stream, - ::google::protobuf::MessageLite *msg) - : stream_(stream), msg_(msg) {} - - virtual void Run(const Next &next) override { - namespace pbio = google::protobuf::io; - auto handler = [this, next](const asio::error_code &ec, size_t) { - Status status; - if (ec) { - status = ToStatus(ec); - } else { - pbio::ArrayInputStream as(&buf_[0], buf_.size()); - pbio::CodedInputStream is(&as); - uint32_t size = 0; - bool v = is.ReadVarint32(&size); - assert(v); - is.PushLimit(size); - msg_->Clear(); - v = msg_->MergeFromCodedStream(&is); - assert(v); - } - next(status); - }; - asio::async_read( - *stream_, asio::buffer(buf_), - std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this, - std::placeholders::_1, std::placeholders::_2), - handler); - } - -private: - size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { - if (ec) { - return 0; - } - - size_t offset = 0, len = 0; - for (size_t i = 0; i + 1 < transferred && i < sizeof(int); ++i) { - len = (len << 7) | (buf_[i] & 0x7f); - if ((uint8_t)buf_.at(i) < 0x80) { - offset = i + 1; - break; - } - } - - assert(offset + len < buf_.size() && "Message is too big"); - return offset ? len + offset - transferred : 1; - } - - Stream *stream_; - ::google::protobuf::MessageLite *msg_; - std::array<char, MaxMessageSize> buf_; -}; - -template <class Stream> -struct WriteDelimitedPBMessageContinuation : Continuation { - WriteDelimitedPBMessageContinuation(Stream *stream, - const google::protobuf::MessageLite *msg) - : stream_(stream), msg_(msg) {} - - virtual void Run(const Next &next) override { - namespace pbio = google::protobuf::io; - int size = msg_->ByteSize(); - buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size); - pbio::StringOutputStream ss(&buf_); - pbio::CodedOutputStream os(&ss); - os.WriteVarint32(size); - msg_->SerializeToCodedStream(&os); - write_coroutine_ = - std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_))); - write_coroutine_->Run([next](const Status &stat) { next(stat); }); - } - -private: - Stream *stream_; - const google::protobuf::MessageLite *msg_; - std::string buf_; - std::shared_ptr<Continuation> write_coroutine_; -}; - -template <class Stream, size_t MaxMessageSize = 512> -static inline Continuation * -ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { - return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream, - msg); -} - -template <class Stream> -static inline Continuation * -WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { - return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg); -} -} -} -#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc deleted file mode 100644 index 3192614..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "hdfs_public_api.h" - -namespace hdfs { - -IoService::~IoService() {} - -IoService *IoService::New() { - return new IoServiceImpl(); -} - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h deleted file mode 100644 index 95567c0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef COMMON_HDFS_PUBLIC_API_H_ -#define COMMON_HDFS_PUBLIC_API_H_ - -#include "libhdfspp/hdfs.h" - -#include <asio/io_service.hpp> - -namespace hdfs { - -class IoServiceImpl : public IoService { - public: - virtual void Run() override { - asio::io_service::work work(io_service_); - io_service_.run(); - } - virtual void Stop() override { io_service_.stop(); } - ::asio::io_service &io_service() { return io_service_; } - private: - ::asio::io_service io_service_; -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/logging.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/logging.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/logging.h deleted file mode 100644 index 82bdae0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/logging.h +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LIB_COMMON_LOGGING_H_ -#define LIB_COMMON_LOGGING_H_ - -#include <iostream> - -namespace hdfs { - -enum LogLevel { - kDebug, - kInfo, - kWarning, - kError, -}; - -#define LOG_DEBUG() LogMessage(kDebug) -#define LOG_INFO() LogMessage(kInfo) -#define LOG_WARN() LogMessage(kWarning) -#define LOG_ERROR() LogMessage(kError) - -class LogMessage { - public: - LogMessage(const LogLevel &l) { - static constexpr const char * kLogLevelMessage[] = {"DEBUG", "INFO", "WARN", "ERROR"}; - ::std::cerr << "[" << kLogLevelMessage[(size_t)l] << "] "; - } - - ~LogMessage() { - ::std::cerr << std::endl; - } - - LogMessage& operator<<(const std::string& msg) { - ::std::cerr << msg; - return *this; - } - LogMessage& operator<<(int x) { - ::std::cerr << x; - return *this; - } -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/options.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/options.cc deleted file mode 100644 index 529fd0b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/options.cc +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "libhdfspp/options.h" - -namespace hdfs { - -Options::Options() - : rpc_timeout(30000) -{} - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h deleted file mode 100644 index 71fee7a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_COMMON_SASL_AUTHENTICATOR_H_ -#define LIB_COMMON_SASL_AUTHENTICATOR_H_ - -#include "libhdfspp/status.h" - -namespace hdfs { - -class DigestMD5AuthenticatorTest_TestResponse_Test; - -/** - * A specialized implementation of RFC 2831 for the HDFS - * DataTransferProtocol. - * - * The current lacks the following features: - * * Encoding the username, realm, and password in ISO-8859-1 when - * it is required by the RFC. They are always encoded in UTF-8. - * * Checking whether the challenges from the server are - * well-formed. - * * Specifying authzid, digest-uri and maximum buffer size. - * * Supporting QOP other than the auth level. - **/ -class DigestMD5Authenticator { -public: - Status EvaluateResponse(const std::string &payload, std::string *result); - DigestMD5Authenticator(const std::string &username, - const std::string &password, bool mock_nonce = false); - -private: - Status GenerateFirstResponse(std::string *result); - Status GenerateResponseValue(std::string *response_value); - Status ParseFirstChallenge(const std::string &payload); - - static size_t NextToken(const std::string &payload, size_t off, - std::string *tok); - void GenerateCNonce(); - std::string username_; - std::string password_; - std::string nonce_; - std::string cnonce_; - std::string realm_; - std::string qop_; - unsigned nonce_count_; - - const bool TEST_mock_cnonce_; - friend class DigestMD5AuthenticatorTest_TestResponse_Test; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc deleted file mode 100644 index 3ca8578..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "sasl_authenticator.h" - -#include "common/util.h" - -#include <openssl/rand.h> -#include <openssl/md5.h> - -#include <iomanip> -#include <map> -#include <sstream> - -namespace hdfs { - -static std::string QuoteString(const std::string &src); -static std::string GetMD5Digest(const std::string &src); -static std::string BinaryToHex(const std::string &src); - -static const char kDigestUri[] = "hdfs/0"; -static const size_t kMaxBufferSize = 65536; - -DigestMD5Authenticator::DigestMD5Authenticator(const std::string &username, - const std::string &password, - bool mock_nonce) - : username_(username), password_(password), nonce_count_(0), - TEST_mock_cnonce_(mock_nonce) {} - -Status DigestMD5Authenticator::EvaluateResponse(const std::string &payload, - std::string *result) { - Status status = ParseFirstChallenge(payload); - if (status.ok()) { - status = GenerateFirstResponse(result); - } - return status; -} - -size_t DigestMD5Authenticator::NextToken(const std::string &payload, size_t off, - std::string *tok) { - tok->clear(); - if (off >= payload.size()) { - return std::string::npos; - } - - char c = payload[off]; - if (c == '=' || c == ',') { - *tok = c; - return off + 1; - } - - int quote_count = 0; - for (; off < payload.size(); ++off) { - char c = payload[off]; - if (c == '"') { - ++quote_count; - if (quote_count == 2) { - return off + 1; - } - continue; - } - - if (c == '=') { - if (quote_count) { - tok->append(&c, 1); - } else { - break; - } - } else if (('0' <= c && c <= '9') || ('a' <= c && c <= 'z') || - ('A' <= c && c <= 'Z') || c == '+' || c == '/' || c == '-' || - c == '_' || c == '@') { - tok->append(&c, 1); - } else { - break; - } - } - return off; -} - -void DigestMD5Authenticator::GenerateCNonce() { - if (!TEST_mock_cnonce_) { - char buf[8] = {0,}; - RAND_pseudo_bytes(reinterpret_cast<unsigned char *>(buf), sizeof(buf)); - cnonce_ = Base64Encode(std::string(buf, sizeof(buf))); - } -} - -Status DigestMD5Authenticator::ParseFirstChallenge(const std::string &payload) { - std::map<std::string, std::string> props; - std::string token; - enum { - kStateLVal, - kStateEqual, - kStateRVal, - kStateCommaOrEnd, - }; - - int state = kStateLVal; - - std::string lval, rval; - size_t off = 0; - while (true) { - off = NextToken(payload, off, &token); - if (off == std::string::npos) { - break; - } - - switch (state) { - case kStateLVal: - lval = token; - state = kStateEqual; - break; - case kStateEqual: - state = kStateRVal; - break; - case kStateRVal: - rval = token; - props[lval] = rval; - state = kStateCommaOrEnd; - break; - case kStateCommaOrEnd: - state = kStateLVal; - break; - } - } - - if (props["algorithm"] != "md5-sess" || props["charset"] != "utf-8" || - props.find("nonce") == props.end()) { - return Status::Error("Invalid challenge"); - } - realm_ = props["realm"]; - nonce_ = props["nonce"]; - qop_ = props["qop"]; - return Status::OK(); -} - -Status DigestMD5Authenticator::GenerateFirstResponse(std::string *result) { - // TODO: Support auth-int and auth-conf - // Handle cipher - if (qop_ != "auth") { - return Status::Unimplemented(); - } - - std::stringstream ss; - GenerateCNonce(); - ss << "charset=utf-8,username=\"" << QuoteString(username_) << "\"" - << ",authzid=\"" << QuoteString(username_) << "\"" - << ",nonce=\"" << QuoteString(nonce_) << "\"" - << ",digest-uri=\"" << kDigestUri << "\"" - << ",maxbuf=" << kMaxBufferSize << ",cnonce=\"" << cnonce_ << "\""; - - if (realm_.size()) { - ss << ",realm=\"" << QuoteString(realm_) << "\""; - } - - ss << ",nc=" << std::hex << std::setw(8) << std::setfill('0') - << ++nonce_count_; - std::string response_value; - GenerateResponseValue(&response_value); - ss << ",response=" << response_value; - *result = ss.str(); - return result->size() > 4096 ? Status::Error("Response too big") - : Status::OK(); -} - -/** - * Generate the response value specified in S 2.1.2.1 in RFC2831. - **/ -Status -DigestMD5Authenticator::GenerateResponseValue(std::string *response_value) { - std::stringstream begin_a1, a1_ss; - std::string a1, a2; - - if (qop_ == "auth") { - a2 = std::string("AUTHENTICATE:") + kDigestUri; - } else { - a2 = std::string("AUTHENTICATE:") + kDigestUri + - ":00000000000000000000000000000000"; - } - - begin_a1 << username_ << ":" << realm_ << ":" << password_; - a1_ss << GetMD5Digest(begin_a1.str()) << ":" << nonce_ << ":" << cnonce_ - << ":" << username_; - - std::stringstream combine_ss; - combine_ss << BinaryToHex(GetMD5Digest(a1_ss.str())) << ":" << nonce_ << ":" - << std::hex << std::setw(8) << std::setfill('0') << nonce_count_ - << ":" << cnonce_ << ":" << qop_ << ":" - << BinaryToHex(GetMD5Digest(a2)); - *response_value = BinaryToHex(GetMD5Digest(combine_ss.str())); - return Status::OK(); -} - -static std::string QuoteString(const std::string &src) { - std::string dst; - dst.resize(2 * src.size()); - size_t j = 0; - for (size_t i = 0; i < src.size(); ++i) { - if (src[i] == '"') { - dst[j++] = '\\'; - } - dst[j++] = src[i]; - } - dst.resize(j); - return dst; -} - -static std::string GetMD5Digest(const std::string &src) { - MD5_CTX ctx; - unsigned long long res[2]; - MD5_Init(&ctx); - MD5_Update(&ctx, src.c_str(), src.size()); - MD5_Final(reinterpret_cast<unsigned char *>(res), &ctx); - return std::string(reinterpret_cast<char *>(res), sizeof(res)); -} - -static std::string BinaryToHex(const std::string &src) { - std::stringstream ss; - ss << std::hex << std::setfill('0'); - for (size_t i = 0; i < src.size(); ++i) { - unsigned c = (unsigned)(static_cast<unsigned char>(src[i])); - ss << std::setw(2) << c; - } - return ss.str(); -} -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc deleted file mode 100644 index 66cfa1c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "libhdfspp/status.h" - -#include <cassert> -#include <cstring> - -namespace hdfs { - -Status::Status(int code, const char *msg1) - : state_(ConstructState(code, msg1, nullptr)) {} - -Status::Status(int code, const char *msg1, const char *msg2) - : state_(ConstructState(code, msg1, msg2)) {} - -const char *Status::ConstructState(int code, const char *msg1, - const char *msg2) { - assert(code != kOk); - const uint32_t len1 = strlen(msg1); - const uint32_t len2 = msg2 ? strlen(msg2) : 0; - const uint32_t size = len1 + (len2 ? (2 + len2) : 0); - char *result = new char[size + 8 + 2]; - *reinterpret_cast<uint32_t *>(result) = size; - *reinterpret_cast<uint32_t *>(result + 4) = code; - memcpy(result + 8, msg1, len1); - if (len2) { - result[8 + len1] = ':'; - result[9 + len1] = ' '; - memcpy(result + 10 + len1, msg2, len2); - } - return result; -} - -std::string Status::ToString() const { - if (!state_) { - return "OK"; - } else { - uint32_t length = *reinterpret_cast<const uint32_t *>(state_); - return std::string(state_ + 8, length); - } -} - -const char *Status::CopyState(const char *state) { - uint32_t size; - memcpy(&size, state, sizeof(size)); - char *result = new char[size + 8]; - memcpy(result, state, size + 8); - return result; -} -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h deleted file mode 100644 index ff9f36c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_COMMON_UTIL_H_ -#define LIB_COMMON_UTIL_H_ - -#include "libhdfspp/status.h" - -#include <asio/error_code.hpp> - -#include <google/protobuf/message_lite.h> -#include <google/protobuf/io/coded_stream.h> - -namespace hdfs { - -static inline Status ToStatus(const ::asio::error_code &ec) { - if (ec) { - return Status(ec.value(), ec.message().c_str()); - } else { - return Status::OK(); - } -} - -static inline int DelimitedPBMessageSize( - const ::google::protobuf::MessageLite *msg) { - size_t size = msg->ByteSize(); - return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size; -} - -static inline void ReadDelimitedPBMessage( - ::google::protobuf::io::CodedInputStream *in, - ::google::protobuf::MessageLite *msg) { - uint32_t size = 0; - in->ReadVarint32(&size); - auto limit = in->PushLimit(size); - msg->ParseFromCodedStream(in); - in->PopLimit(limit); -} - -std::string Base64Encode(const std::string &src); - -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt deleted file mode 100644 index f386688..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt +++ /dev/null @@ -1,2 +0,0 @@ -add_library(fs filesystem.cc inputstream.cc) -add_dependencies(fs proto) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc deleted file mode 100644 index 0b958a8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "filesystem.h" -#include "common/continuation/asio.h" -#include "common/util.h" - -#include <asio/ip/tcp.hpp> - -#include <limits> - -namespace hdfs { - -static const char kNamenodeProtocol[] = - "org.apache.hadoop.hdfs.protocol.ClientProtocol"; -static const int kNamenodeProtocolVersion = 1; - -using ::asio::ip::tcp; - -FileSystem::~FileSystem() {} - -void FileSystem::New( - IoService *io_service, const Options &options, const std::string &server, - const std::string &service, - const std::function<void(const Status &, FileSystem *)> &handler) { - FileSystemImpl *impl = new FileSystemImpl(io_service, options); - impl->Connect(server, service, [impl, handler](const Status &stat) { - if (stat.ok()) { - handler(stat, impl); - } else { - delete impl; - handler(stat, nullptr); - } - }); -} - -FileSystemImpl::FileSystemImpl(IoService *io_service, const Options &options) - : io_service_(static_cast<IoServiceImpl *>(io_service)), - engine_(&io_service_->io_service(), options, - RpcEngine::GetRandomClientName(), kNamenodeProtocol, - kNamenodeProtocolVersion), - namenode_(&engine_) {} - -void FileSystemImpl::Connect(const std::string &server, - const std::string &service, - std::function<void(const Status &)> &&handler) { - using namespace continuation; - typedef std::vector<tcp::endpoint> State; - auto m = Pipeline<State>::Create(); - m->Push(Resolve(&io_service_->io_service(), server, service, - std::back_inserter(m->state()))) - .Push(Bind([this, m](const Continuation::Next &next) { - engine_.Connect(m->state().front(), next); - })); - m->Run([this, handler](const Status &status, const State &) { - if (status.ok()) { - engine_.Start(); - } - handler(status); - }); -} - -void FileSystemImpl::Open( - const std::string &path, - const std::function<void(const Status &, InputStream *)> &handler) { - using ::hadoop::hdfs::GetBlockLocationsRequestProto; - using ::hadoop::hdfs::GetBlockLocationsResponseProto; - - struct State { - GetBlockLocationsRequestProto req; - std::shared_ptr<GetBlockLocationsResponseProto> resp; - }; - - auto m = continuation::Pipeline<State>::Create(); - auto &req = m->state().req; - req.set_src(path); - req.set_offset(0); - req.set_length(std::numeric_limits<long long>::max()); - m->state().resp.reset(new GetBlockLocationsResponseProto()); - - State *s = &m->state(); - m->Push(continuation::Bind( - [this, s](const continuation::Continuation::Next &next) { - namenode_.GetBlockLocations(&s->req, s->resp, next); - })); - m->Run([this, handler](const Status &stat, const State &s) { - handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations()) - : nullptr); - }); -} -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h deleted file mode 100644 index 72f80b7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_ -#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ - -#include "common/hdfs_public_api.h" -#include "libhdfspp/hdfs.h" -#include "rpc/rpc_engine.h" -#include "ClientNamenodeProtocol.pb.h" -#include "ClientNamenodeProtocol.hrpc.inl" - -namespace hdfs { - -class FileSystemImpl : public FileSystem { -public: - FileSystemImpl(IoService *io_service, const Options &options); - void Connect(const std::string &server, const std::string &service, - std::function<void(const Status &)> &&handler); - virtual void Open(const std::string &path, - const std::function<void(const Status &, InputStream *)> - &handler) override; - RpcEngine &rpc_engine() { return engine_; } - -private: - IoServiceImpl *io_service_; - RpcEngine engine_; - ClientNamenodeProtocol namenode_; -}; - -class InputStreamImpl : public InputStream { -public: - InputStreamImpl(FileSystemImpl *fs, - const ::hadoop::hdfs::LocatedBlocksProto *blocks); - virtual void - PositionRead(void *buf, size_t nbyte, uint64_t offset, - const std::set<std::string> &excluded_datanodes, - const std::function<void(const Status &, const std::string &, - size_t)> &handler) override; - template <class MutableBufferSequence, class Handler> - void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers, - const std::set<std::string> &excluded_datanodes, - const Handler &handler); - template <class BlockReaderTrait, class MutableBufferSequence, class Handler> - void AsyncReadBlock(const std::string &client_name, - const hadoop::hdfs::LocatedBlockProto &block, - const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, - const MutableBufferSequence &buffers, - const Handler &handler); - -private: - FileSystemImpl *fs_; - unsigned long long file_length_; - std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; - template <class Reader> struct HandshakeContinuation; - template <class Reader, class MutableBufferSequence> - struct ReadBlockContinuation; - struct RemoteBlockReaderTrait; -}; -} - -#include "inputstream_impl.h" - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc deleted file mode 100644 index b47dcb1..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "filesystem.h" - -namespace hdfs { - -using ::hadoop::hdfs::LocatedBlocksProto; - -InputStream::~InputStream() {} - -InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, - const LocatedBlocksProto *blocks) - : fs_(fs), file_length_(blocks->filelength()) { - for (const auto &block : blocks->blocks()) { - blocks_.push_back(block); - } - - if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) { - blocks_.push_back(blocks->lastblock()); - } -} - -void InputStreamImpl::PositionRead( - void *buf, size_t nbyte, uint64_t offset, - const std::set<std::string> &excluded_datanodes, - const std::function<void(const Status &, const std::string &, size_t)> - &handler) { - AsyncPreadSome(offset, asio::buffer(buf, nbyte), excluded_datanodes, handler); -} -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h deleted file mode 100644 index 2044f3f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef FS_INPUTSTREAM_IMPL_H_ -#define FS_INPUTSTREAM_IMPL_H_ - -#include "reader/block_reader.h" - -#include "common/continuation/asio.h" -#include "common/continuation/protobuf.h" - -#include <functional> -#include <future> -#include <type_traits> - -namespace hdfs { - -struct InputStreamImpl::RemoteBlockReaderTrait { - typedef RemoteBlockReader<asio::ip::tcp::socket> Reader; - struct State { - std::unique_ptr<asio::ip::tcp::socket> conn_; - std::unique_ptr<Reader> reader_; - std::array<asio::ip::tcp::endpoint, 1> endpoints_; - size_t transferred_; - Reader *reader() { return reader_.get(); } - size_t *transferred() { return &transferred_; } - const size_t *transferred() const { return &transferred_; } - }; - static continuation::Pipeline<State> * - CreatePipeline(::asio::io_service *io_service, - const ::hadoop::hdfs::DatanodeInfoProto &dn) { - using namespace ::asio::ip; - auto m = continuation::Pipeline<State>::Create(); - auto &s = m->state(); - s.conn_.reset(new tcp::socket(*io_service)); - s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get())); - auto datanode = dn.id(); - s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()), - datanode.xferport()); - - m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(), - s.endpoints_.end())); - return m; - } -}; - -template <class Reader> -struct InputStreamImpl::HandshakeContinuation : continuation::Continuation { - HandshakeContinuation(Reader *reader, const std::string &client_name, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, uint64_t offset) - : reader_(reader), client_name_(client_name), length_(length), - offset_(offset) { - if (token) { - token_.reset(new hadoop::common::TokenProto()); - token_->CheckTypeAndMergeFrom(*token); - } - block_.CheckTypeAndMergeFrom(*block); - } - - virtual void Run(const Next &next) override { - reader_->async_connect(client_name_, token_.get(), &block_, length_, - offset_, next); - } - -private: - Reader *reader_; - const std::string client_name_; - std::unique_ptr<hadoop::common::TokenProto> token_; - hadoop::hdfs::ExtendedBlockProto block_; - uint64_t length_; - uint64_t offset_; -}; - -template <class Reader, class MutableBufferSequence> -struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation { - ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer, - size_t *transferred) - : reader_(reader), buffer_(buffer), - buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) { - static_assert(!std::is_reference<MutableBufferSequence>::value, - "Buffer must not be a reference type"); - } - - virtual void Run(const Next &next) override { - *transferred_ = 0; - next_ = next; - OnReadData(Status::OK(), 0); - } - -private: - Reader *reader_; - const MutableBufferSequence buffer_; - const size_t buffer_size_; - size_t *transferred_; - std::function<void(const Status &)> next_; - - void OnReadData(const Status &status, size_t transferred) { - using std::placeholders::_1; - using std::placeholders::_2; - *transferred_ += transferred; - if (!status.ok()) { - next_(status); - } else if (*transferred_ >= buffer_size_) { - next_(status); - } else { - reader_->async_read_some( - asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), - std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); - } - } -}; - -template <class MutableBufferSequence, class Handler> -void InputStreamImpl::AsyncPreadSome( - size_t offset, const MutableBufferSequence &buffers, - const std::set<std::string> &excluded_datanodes, const Handler &handler) { - using ::hadoop::hdfs::DatanodeInfoProto; - using ::hadoop::hdfs::LocatedBlockProto; - - auto it = std::find_if( - blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) { - return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); - }); - - if (it == blocks_.end()) { - handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); - return; - } - - const DatanodeInfoProto *chosen_dn = nullptr; - for (int i = 0; i < it->locs_size(); ++i) { - const auto &di = it->locs(i); - if (!excluded_datanodes.count(di.id().datanodeuuid())) { - chosen_dn = &di; - break; - } - } - - if (!chosen_dn) { - handler(Status::ResourceUnavailable("No datanodes available"), "", 0); - return; - } - - uint64_t offset_within_block = offset - it->offset(); - uint64_t size_within_block = std::min<uint64_t>( - it->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); - - AsyncReadBlock<RemoteBlockReaderTrait>( - fs_->rpc_engine().client_name(), *it, *chosen_dn, offset_within_block, - asio::buffer(buffers, size_within_block), handler); -} - -template <class BlockReaderTrait, class MutableBufferSequence, class Handler> -void InputStreamImpl::AsyncReadBlock( - const std::string &client_name, - const hadoop::hdfs::LocatedBlockProto &block, - const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, - const MutableBufferSequence &buffers, const Handler &handler) { - - typedef typename BlockReaderTrait::Reader Reader; - auto m = - BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn); - auto &s = m->state(); - size_t size = asio::buffer_size(buffers); - m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr, - &block.b(), size, offset)) - .Push(new ReadBlockContinuation<Reader, MutableBufferSequence>( - s.reader(), buffers, s.transferred())); - const std::string &dnid = dn.id().datanodeuuid(); - m->Run([handler, dnid](const Status &status, - const typename BlockReaderTrait::State &state) { - handler(status, dnid, *state.transferred()); - }); -} -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt deleted file mode 100644 index d5820fd..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt +++ /dev/null @@ -1,63 +0,0 @@ -set(PROTOBUF_IMPORT_DIRS ${PROTO_HDFS_DIR} ${PROTO_HADOOP_DIR}) - -protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS - ${PROTO_HDFS_DIR}/datatransfer.proto - ${PROTO_HDFS_DIR}/ClientDatanodeProtocol.proto - ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto - ${PROTO_HDFS_DIR}/acl.proto - ${PROTO_HDFS_DIR}/datatransfer.proto - ${PROTO_HDFS_DIR}/encryption.proto - ${PROTO_HDFS_DIR}/hdfs.proto - ${PROTO_HDFS_DIR}/inotify.proto - ${PROTO_HDFS_DIR}/xattr.proto - ${PROTO_HADOOP_DIR}/IpcConnectionContext.proto - ${PROTO_HADOOP_DIR}/ProtobufRpcEngine.proto - ${PROTO_HADOOP_DIR}/RpcHeader.proto - ${PROTO_HADOOP_DIR}/Security.proto -) - -add_executable(protoc-gen-hrpc protoc_gen_hrpc.cc) -target_link_libraries(protoc-gen-hrpc ${PROTOBUF_PROTOC_LIBRARY} ${PROTOBUF_LIBRARY}) - -function(GEN_HRPC SRCS) - if(NOT ARGN) - message(SEND_ERROR "Error: GEN_HRPC() called without any proto files") - return() - endif() - - if(DEFINED PROTOBUF_IMPORT_DIRS) - foreach(DIR ${PROTOBUF_IMPORT_DIRS}) - get_filename_component(ABS_PATH ${DIR} ABSOLUTE) - list(FIND _protobuf_include_path ${ABS_PATH} _contains_already) - if(${_contains_already} EQUAL -1) - list(APPEND _protobuf_include_path -I ${ABS_PATH}) - endif() - endforeach() - endif() - - set(${SRCS}) - - foreach(FIL ${ARGN}) - get_filename_component(ABS_FIL ${FIL} ABSOLUTE) - get_filename_component(FIL_WE ${FIL} NAME_WE) - - list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl") - - add_custom_command( - OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl" - COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} - ARGS --plugin=protoc-gen-hrpc=${CMAKE_CURRENT_BINARY_DIR}/protoc-gen-hrpc --hrpc_out=${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL} - DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE} protoc-gen-hrpc - COMMENT "Running HRPC protocol buffer compiler on ${FIL}" - VERBATIM ) - endforeach() - - set_source_files_properties(${${SRCS}} PROPERTIES GENERATED TRUE) - set(${SRCS} ${${SRCS}} PARENT_SCOPE) -endfunction() - -gen_hrpc(HRPC_SRCS - ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto -) - -add_library(proto ${PROTO_SRCS} ${PROTO_HDRS} ${HRPC_SRCS}) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h deleted file mode 100644 index 6f380ad..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h +++ /dev/null @@ -1,82 +0,0 @@ -// Protocol Buffers - Google's data interchange format -// Copyright 2008 Google Inc. All rights reserved. -// https://developers.google.com/protocol-buffers/ -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -// Author: [email protected] (Kenton Varda) -// Based on original Protocol Buffers design by -// Sanjay Ghemawat, Jeff Dean, and others. - -#ifndef LIBHDFSPP_PROTO_CPP_HELPERS_H_ -#define LIBHDFSPP_PROTO_CPP_HELPERS_H_ - -#include <string> - -/** - * The functions in this file are derived from the original implementation of - *the protobuf library from Google. - **/ - -static inline std::string StripProto(const std::string &str) { - static const std::string kExtension = ".proto"; - if (str.size() >= kExtension.size() && - str.compare(str.size() - kExtension.size(), kExtension.size(), - kExtension) == 0) { - return str.substr(0, str.size() - kExtension.size()); - } else { - return str; - } -} - -static inline std::string ToCamelCase(const std::string &input) { - bool cap_next_letter = true; - std::string result; - // Note: I distrust ctype.h due to locales. - for (size_t i = 0; i < input.size(); i++) { - if ('a' <= input[i] && input[i] <= 'z') { - if (cap_next_letter) { - result += input[i] + ('A' - 'a'); - } else { - result += input[i]; - } - cap_next_letter = false; - } else if ('A' <= input[i] && input[i] <= 'Z') { - // Capital letters are left as-is. - result += input[i]; - cap_next_letter = false; - } else if ('0' <= input[i] && input[i] <= '9') { - result += input[i]; - cap_next_letter = true; - } else { - cap_next_letter = true; - } - } - return result; -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc deleted file mode 100644 index d8e9ab2..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "cpp_helpers.h" - -#include <google/protobuf/compiler/code_generator.h> -#include <google/protobuf/compiler/plugin.h> -#include <google/protobuf/descriptor.h> -#include <google/protobuf/io/printer.h> -#include <google/protobuf/io/zero_copy_stream.h> -#include <google/protobuf/stubs/common.h> - -#include <memory> - -using ::google::protobuf::FileDescriptor; -using ::google::protobuf::MethodDescriptor; -using ::google::protobuf::ServiceDescriptor; -using ::google::protobuf::compiler::CodeGenerator; -using ::google::protobuf::compiler::GeneratorContext; -using ::google::protobuf::io::Printer; -using ::google::protobuf::io::ZeroCopyOutputStream; - -class StubGenerator : public CodeGenerator { -public: - virtual bool Generate(const FileDescriptor *file, const std::string &, - GeneratorContext *ctx, - std::string *error) const override; - -private: - void EmitService(const ServiceDescriptor *service, Printer *out) const; - void EmitMethod(const MethodDescriptor *method, Printer *out) const; -}; - -bool StubGenerator::Generate(const FileDescriptor *file, const std::string &, - GeneratorContext *ctx, std::string *) const { - namespace pb = ::google::protobuf; - std::unique_ptr<ZeroCopyOutputStream> os( - ctx->Open(StripProto(file->name()) + ".hrpc.inl")); - Printer out(os.get(), '$'); - for (int i = 0; i < file->service_count(); ++i) { - const ServiceDescriptor *service = file->service(i); - EmitService(service, &out); - } - return true; -} - -void StubGenerator::EmitService(const ServiceDescriptor *service, - Printer *out) const { - out->Print("\n// GENERATED AUTOMATICALLY. DO NOT MODIFY.\n" - "class $service$ {\n" - "private:\n" - " ::hdfs::RpcEngine *const engine_;\n" - "public:\n" - " typedef std::function<void(const ::hdfs::Status &)> Callback;\n" - " typedef ::google::protobuf::MessageLite Message;\n" - " inline $service$(::hdfs::RpcEngine *engine)\n" - " : engine_(engine) {}\n", - "service", service->name()); - for (int i = 0; i < service->method_count(); ++i) { - const MethodDescriptor *method = service->method(i); - EmitMethod(method, out); - } - out->Print("};\n"); -} - -void StubGenerator::EmitMethod(const MethodDescriptor *method, - Printer *out) const { - out->Print( - "\n inline void $camel_method$(const Message *req, " - "const std::shared_ptr<Message> &resp, " - "const Callback &handler) {\n" - " engine_->AsyncRpc(\"$method$\", req, resp, handler);\n" - " }\n", - "camel_method", ToCamelCase(method->name()), "method", method->name()); -} - -int main(int argc, char *argv[]) { - StubGenerator generator; - return google::protobuf::compiler::PluginMain(argc, argv, &generator); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt deleted file mode 100644 index 71e28ac..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -add_library(reader remote_block_reader.cc datatransfer.cc) -add_dependencies(reader proto) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h deleted file mode 100644 index 81636b9..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef BLOCK_READER_H_ -#define BLOCK_READER_H_ - -#include "libhdfspp/status.h" -#include "datatransfer.pb.h" - -#include <memory> - -namespace hdfs { - -struct CacheStrategy { - bool drop_behind_specified; - bool drop_behind; - bool read_ahead_specified; - unsigned long long read_ahead; - CacheStrategy() - : drop_behind_specified(false), drop_behind(false), - read_ahead_specified(false), read_ahead(false) {} -}; - -enum DropBehindStrategy { - kUnspecified = 0, - kEnableDropBehind = 1, - kDisableDropBehind = 2, -}; - -enum EncryptionScheme { - kNone = 0, - kAESCTRNoPadding = 1, -}; - -struct BlockReaderOptions { - bool verify_checksum; - CacheStrategy cache_strategy; - EncryptionScheme encryption_scheme; - - BlockReaderOptions() - : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {} -}; - -template <class Stream> -class RemoteBlockReader - : public std::enable_shared_from_this<RemoteBlockReader<Stream>> { -public: - explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream) - : stream_(stream), state_(kOpen), options_(options), - chunk_padding_bytes_(0) {} - - template <class MutableBufferSequence, class ReadHandler> - void async_read_some(const MutableBufferSequence &buffers, - const ReadHandler &handler); - - template <class MutableBufferSequence> - size_t read_some(const MutableBufferSequence &buffers, Status *status); - - Status connect(const std::string &client_name, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset); - - template <class ConnectHandler> - void async_connect(const std::string &client_name, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, uint64_t offset, - const ConnectHandler &handler); - -private: - struct ReadPacketHeader; - struct ReadChecksum; - struct ReadPadding; - template <class MutableBufferSequence> struct ReadData; - struct AckRead; - enum State { - kOpen, - kReadPacketHeader, - kReadChecksum, - kReadPadding, - kReadData, - kFinished, - }; - - Stream *stream_; - hadoop::hdfs::PacketHeaderProto header_; - State state_; - BlockReaderOptions options_; - size_t packet_len_; - int packet_data_read_bytes_; - int chunk_padding_bytes_; - long long bytes_to_read_; - std::vector<char> checksum_; -}; -} - -#include "remote_block_reader_impl.h" - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc deleted file mode 100644 index d936407..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "datatransfer.h" - -#include "libhdfspp/status.h" - -namespace hdfs { - -namespace DataTransferSaslStreamUtil { - -static const auto kSUCCESS = hadoop::hdfs::DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS; - -using hadoop::hdfs::DataTransferEncryptorMessageProto; - -Status ConvertToStatus(const DataTransferEncryptorMessageProto *msg, std::string *payload) { - using namespace hadoop::hdfs; - auto s = msg->status(); - if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR_UNKNOWN_KEY) { - payload->clear(); - return Status::Exception("InvalidEncryptionKeyException", msg->message().c_str()); - } else if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR) { - payload->clear(); - return Status::Error(msg->message().c_str()); - } else { - *payload = msg->payload(); - return Status::OK(); - } -} - -void PrepareInitialHandshake(DataTransferEncryptorMessageProto *msg) { - msg->set_status(kSUCCESS); - msg->set_payload(""); -} - -} -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h deleted file mode 100644 index 511c2eb..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_READER_DATA_TRANSFER_H_ -#define LIB_READER_DATA_TRANSFER_H_ - -#include "common/sasl_authenticator.h" - -namespace hdfs { - -enum { - kDataTransferVersion = 28, - kDataTransferSasl = 0xdeadbeef, -}; - -enum Operation { - kWriteBlock = 80, - kReadBlock = 81, -}; - -template <class Stream> class DataTransferSaslStream { -public: - DataTransferSaslStream(Stream *stream, const std::string &username, - const std::string &password) - : stream_(stream), authenticator_(username, password) {} - - template <class Handler> void Handshake(const Handler &next); - - template <class MutableBufferSequence, class ReadHandler> - void async_read_some(const MutableBufferSequence &buffers, - ReadHandler &&handler); - - template <class ConstBufferSequence, class WriteHandler> - void async_write_some(const ConstBufferSequence &buffers, - WriteHandler &&handler); - -private: - DataTransferSaslStream(const DataTransferSaslStream &) = delete; - DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete; - Stream *stream_; - DigestMD5Authenticator authenticator_; - struct ReadSaslMessage; - struct Authenticator; -}; -} - -#include "datatransfer_impl.h" - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h deleted file mode 100644 index 088b86e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_READER_DATATRANFER_IMPL_H_ -#define LIB_READER_DATATRANFER_IMPL_H_ - -#include "datatransfer.pb.h" -#include "common/continuation/continuation.h" -#include "common/continuation/asio.h" -#include "common/continuation/protobuf.h" - -#include <asio/read.hpp> -#include <asio/buffer.hpp> - -namespace hdfs { - -namespace DataTransferSaslStreamUtil { -Status -ConvertToStatus(const ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg, - std::string *payload); -void PrepareInitialHandshake( - ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg); -} - -template <class Stream> -struct DataTransferSaslStream<Stream>::Authenticator - : continuation::Continuation { - Authenticator(DigestMD5Authenticator *authenticator, - const std::string *request, - hadoop::hdfs::DataTransferEncryptorMessageProto *msg) - : authenticator_(authenticator), request_(request), msg_(msg) {} - - virtual void Run(const Next &next) override { - using namespace ::hadoop::hdfs; - std::string response; - Status status = authenticator_->EvaluateResponse(*request_, &response); - msg_->Clear(); - if (status.ok()) { - // TODO: Handle encryption scheme - msg_->set_payload(response); - msg_->set_status( - DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); - } else { - msg_->set_status( - DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR); - } - next(Status::OK()); - } - -private: - DigestMD5Authenticator *authenticator_; - const std::string *request_; - hadoop::hdfs::DataTransferEncryptorMessageProto *msg_; -}; - -template <class Stream> -struct DataTransferSaslStream<Stream>::ReadSaslMessage - : continuation::Continuation { - ReadSaslMessage(Stream *stream, std::string *data) - : stream_(stream), data_(data), read_pb_(stream, &resp_) {} - - virtual void Run(const Next &next) override { - auto handler = [this, next](const Status &status) { - if (status.ok()) { - Status new_stat = - DataTransferSaslStreamUtil::ConvertToStatus(&resp_, data_); - next(new_stat); - } else { - next(status); - } - }; - read_pb_.Run(handler); - } - -private: - Stream *stream_; - std::string *data_; - hadoop::hdfs::DataTransferEncryptorMessageProto resp_; - continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_; -}; - -template <class Stream> -template <class Handler> -void DataTransferSaslStream<Stream>::Handshake(const Handler &next) { - using ::hadoop::hdfs::DataTransferEncryptorMessageProto; - using ::hdfs::continuation::Write; - using ::hdfs::continuation::WriteDelimitedPBMessage; - - static const int kMagicNumber = htonl(kDataTransferSasl); - static const asio::const_buffers_1 kMagicNumberBuffer = asio::buffer( - reinterpret_cast<const char *>(kMagicNumber), sizeof(kMagicNumber)); - - struct State { - DataTransferEncryptorMessageProto req0; - std::string resp0; - DataTransferEncryptorMessageProto req1; - std::string resp1; - Stream *stream; - }; - auto m = continuation::Pipeline<State>::Create(); - State *s = &m->state(); - s->stream = stream_; - - DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0); - - m->Push(Write(stream_, kMagicNumberBuffer)) - .Push(WriteDelimitedPBMessage(stream_, &s->req0)) - .Push(new ReadSaslMessage(stream_, &s->resp0)) - .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1)) - .Push(WriteDelimitedPBMessage(stream_, &s->req1)) - .Push(new ReadSaslMessage(stream_, &s->resp1)); - m->Run([next](const Status &status, const State &) { next(status); }); -} - -template <class Stream> -template <class MutableBufferSequence, class ReadHandler> -void DataTransferSaslStream<Stream>::async_read_some( - const MutableBufferSequence &buffers, ReadHandler &&handler) { - stream_->async_read_some(buffers, handler); -} - -template <class Stream> -template <typename ConstBufferSequence, typename WriteHandler> -void DataTransferSaslStream<Stream>::async_write_some( - const ConstBufferSequence &buffers, WriteHandler &&handler) { - stream_->async_write_some(buffers, handler); -} -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc deleted file mode 100644 index 68bc4ee..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "block_reader.h" - -namespace hdfs { - -hadoop::hdfs::OpReadBlockProto -ReadBlockProto(const std::string &client_name, bool verify_checksum, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset) { - using namespace hadoop::hdfs; - using namespace hadoop::common; - BaseHeaderProto *base_h = new BaseHeaderProto(); - base_h->set_allocated_block(new ExtendedBlockProto(*block)); - if (token) { - base_h->set_allocated_token(new TokenProto(*token)); - } - ClientOperationHeaderProto *h = new ClientOperationHeaderProto(); - h->set_clientname(client_name); - h->set_allocated_baseheader(base_h); - - OpReadBlockProto p; - p.set_allocated_header(h); - p.set_offset(offset); - p.set_len(length); - p.set_sendchecksums(verify_checksum); - // TODO: p.set_allocated_cachingstrategy(); - return p; -} -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h deleted file mode 100644 index 68ea6ad..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h +++ /dev/null @@ -1,342 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_ -#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_ - -#include "datatransfer.h" -#include "common/continuation/asio.h" -#include "common/continuation/protobuf.h" - -#include <asio/buffers_iterator.hpp> -#include <asio/streambuf.hpp> -#include <asio/write.hpp> - -#include <arpa/inet.h> - -#include <future> - -namespace hdfs { - -hadoop::hdfs::OpReadBlockProto -ReadBlockProto(const std::string &client_name, bool verify_checksum, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset); - -template <class Stream> -template <class ConnectHandler> -void RemoteBlockReader<Stream>::async_connect( - const std::string &client_name, const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset, const ConnectHandler &handler) { - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - bytes_to_read_ = length; - - struct State { - std::string header; - hadoop::hdfs::OpReadBlockProto request; - hadoop::hdfs::BlockOpResponseProto response; - }; - - auto m = continuation::Pipeline<State>::Create(); - State *s = &m->state(); - - s->header.insert(s->header.begin(), - {0, kDataTransferVersion, Operation::kReadBlock}); - s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum, - token, block, length, offset)); - - auto read_pb_message = - new continuation::ReadDelimitedPBMessageContinuation<Stream, 16384>( - stream_, &s->response); - - m->Push(continuation::Write(stream_, asio::buffer(s->header))) - .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request)) - .Push(read_pb_message); - - m->Run([this, handler, offset](const Status &status, const State &s) { - Status stat = status; - if (stat.ok()) { - const auto &resp = s.response; - if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) { - if (resp.has_readopchecksuminfo()) { - const auto &checksum_info = resp.readopchecksuminfo(); - chunk_padding_bytes_ = offset - checksum_info.chunkoffset(); - } - state_ = kReadPacketHeader; - } else { - stat = Status::Error(s.response.message().c_str()); - } - } - handler(stat); - }); -} - -template <class Stream> -struct RemoteBlockReader<Stream>::ReadPacketHeader - : continuation::Continuation { - ReadPacketHeader(RemoteBlockReader<Stream> *parent) : parent_(parent) {} - - virtual void Run(const Next &next) override { - parent_->packet_data_read_bytes_ = 0; - parent_->packet_len_ = 0; - auto handler = [next, this](const asio::error_code &ec, size_t) { - Status status; - if (ec) { - status = Status(ec.value(), ec.message().c_str()); - } else { - parent_->packet_len_ = packet_length(); - parent_->header_.Clear(); - bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart], - header_length()); - assert(v && "Failed to parse the header"); - parent_->state_ = kReadChecksum; - } - next(status); - }; - - asio::async_read(*parent_->stream_, asio::buffer(buf_), - std::bind(&ReadPacketHeader::CompletionHandler, this, - std::placeholders::_1, std::placeholders::_2), - handler); - } - -private: - static const size_t kMaxHeaderSize = 512; - static const size_t kPayloadLenOffset = 0; - static const size_t kPayloadLenSize = sizeof(int); - static const size_t kHeaderLenOffset = 4; - static const size_t kHeaderLenSize = sizeof(short); - static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize; - - RemoteBlockReader<Stream> *parent_; - std::array<char, kMaxHeaderSize> buf_; - - size_t packet_length() const { - return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset])); - } - - size_t header_length() const { - return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset])); - } - - size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { - if (ec) { - return 0; - } else if (transferred < kHeaderStart) { - return kHeaderStart - transferred; - } else { - return kHeaderStart + header_length() - transferred; - } - } -}; - -template <class Stream> -struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation { - ReadChecksum(RemoteBlockReader<Stream> *parent) : parent_(parent) {} - - virtual void Run(const Next &next) override { - auto parent = parent_; - if (parent->state_ != kReadChecksum) { - next(Status::OK()); - return; - } - - auto handler = [parent, next](const asio::error_code &ec, size_t) { - Status status; - if (ec) { - status = Status(ec.value(), ec.message().c_str()); - } else { - parent->state_ = - parent->chunk_padding_bytes_ ? kReadPadding : kReadData; - } - next(status); - }; - parent->checksum_.resize(parent->packet_len_ - sizeof(int) - - parent->header_.datalen()); - asio::async_read(*parent->stream_, asio::buffer(parent->checksum_), - handler); - } - -private: - RemoteBlockReader<Stream> *parent_; -}; - -template <class Stream> -struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation { - ReadPadding(RemoteBlockReader<Stream> *parent) - : parent_(parent), padding_(parent->chunk_padding_bytes_), - bytes_transferred_(std::make_shared<size_t>(0)), - read_data_(new ReadData<asio::mutable_buffers_1>( - parent, bytes_transferred_, asio::buffer(padding_))) {} - - virtual void Run(const Next &next) override { - if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) { - next(Status::OK()); - return; - } - - auto h = [next, this](const Status &status) { - if (status.ok()) { - assert(reinterpret_cast<const int &>(*bytes_transferred_) == - parent_->chunk_padding_bytes_); - parent_->chunk_padding_bytes_ = 0; - parent_->state_ = kReadData; - } - next(status); - }; - read_data_->Run(h); - } - -private: - RemoteBlockReader<Stream> *parent_; - std::vector<char> padding_; - std::shared_ptr<size_t> bytes_transferred_; - std::shared_ptr<continuation::Continuation> read_data_; - ReadPadding(const ReadPadding &) = delete; - ReadPadding &operator=(const ReadPadding &) = delete; -}; - -template <class Stream> -template <class MutableBufferSequence> -struct RemoteBlockReader<Stream>::ReadData : continuation::Continuation { - ReadData(RemoteBlockReader<Stream> *parent, - std::shared_ptr<size_t> bytes_transferred, - const MutableBufferSequence &buf) - : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {} - - virtual void Run(const Next &next) override { - auto handler = - [next, this](const asio::error_code &ec, size_t transferred) { - Status status; - if (ec) { - status = Status(ec.value(), ec.message().c_str()); - } - *bytes_transferred_ += transferred; - parent_->bytes_to_read_ -= transferred; - parent_->packet_data_read_bytes_ += transferred; - if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { - parent_->state_ = kReadPacketHeader; - } - next(status); - }; - - auto data_len = - parent_->header_.datalen() - parent_->packet_data_read_bytes_; - async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len), - handler); - } - -private: - RemoteBlockReader<Stream> *parent_; - std::shared_ptr<size_t> bytes_transferred_; - MutableBufferSequence buf_; -}; - -template <class Stream> -struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation { - AckRead(RemoteBlockReader<Stream> *parent) : parent_(parent) {} - - virtual void Run(const Next &next) override { - if (parent_->bytes_to_read_ > 0) { - next(Status::OK()); - return; - } - - auto m = - continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(); - m->state().set_status(parent_->options_.verify_checksum - ? hadoop::hdfs::Status::CHECKSUM_OK - : hadoop::hdfs::Status::SUCCESS); - - m->Push( - continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state())); - - m->Run([this, next](const Status &status, - const hadoop::hdfs::ClientReadStatusProto &) { - if (status.ok()) { - parent_->state_ = RemoteBlockReader<Stream>::kFinished; - } - next(status); - }); - } - -private: - RemoteBlockReader<Stream> *parent_; -}; - -template <class Stream> -template <class MutableBufferSequence, class ReadHandler> -void RemoteBlockReader<Stream>::async_read_some( - const MutableBufferSequence &buffers, const ReadHandler &handler) { - assert(state_ != kOpen && "Not connected"); - - struct State { - std::shared_ptr<size_t> bytes_transferred; - }; - auto m = continuation::Pipeline<State>::Create(); - m->state().bytes_transferred = std::make_shared<size_t>(0); - - m->Push(new ReadPacketHeader(this)) - .Push(new ReadChecksum(this)) - .Push(new ReadPadding(this)) - .Push(new ReadData<MutableBufferSequence>( - this, m->state().bytes_transferred, buffers)) - .Push(new AckRead(this)); - - auto self = this->shared_from_this(); - m->Run([self, handler](const Status &status, const State &state) { - handler(status, *state.bytes_transferred); - }); -} - -template <class Stream> -template <class MutableBufferSequence> -size_t -RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers, - Status *status) { - size_t transferred = 0; - auto done = std::make_shared<std::promise<void>>(); - auto future = done->get_future(); - async_read_some(buffers, - [status, &transferred, done](const Status &stat, size_t t) { - *status = stat; - transferred = t; - done->set_value(); - }); - future.wait(); - return transferred; -} - -template <class Stream> -Status RemoteBlockReader<Stream>::connect( - const std::string &client_name, const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset) { - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future(stat->get_future()); - async_connect(client_name, token, block, length, offset, - [stat](const Status &status) { stat->set_value(status); }); - return future.get(); -} -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt deleted file mode 100644 index aa3951c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt +++ /dev/null @@ -1,3 +0,0 @@ -include_directories(${OPENSSL_INCLUDE_DIRS}) -add_library(rpc rpc_connection.cc rpc_engine.cc) -add_dependencies(rpc proto)
