IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8 Change-Id: I06ab5b56312e482a27fa484414c338438ad6972c Reviewed-on: http://gerrit.cloudera.org:8080/5718 Reviewed-by: Michael Ho <[email protected]> Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c7db60aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c7db60aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c7db60aa Branch: refs/heads/master Commit: c7db60aa46565c19634e8a791df3af8d116b9017 Parents: 852e1bb Author: Henry Robinson <[email protected]> Authored: Fri Oct 28 17:10:46 2016 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Aug 17 03:13:20 2017 +0000 ---------------------------------------------------------------------- be/src/kudu/rpc/CMakeLists.txt | 128 +++ be/src/kudu/rpc/acceptor_pool.cc | 166 +++ be/src/kudu/rpc/acceptor_pool.h | 79 ++ be/src/kudu/rpc/blocking_ops.cc | 127 +++ be/src/kudu/rpc/blocking_ops.h | 62 ++ be/src/kudu/rpc/client_negotiation.cc | 776 ++++++++++++++ be/src/kudu/rpc/client_negotiation.h | 252 +++++ be/src/kudu/rpc/connection.cc | 732 ++++++++++++++ be/src/kudu/rpc/connection.h | 360 +++++++ be/src/kudu/rpc/constants.cc | 38 + be/src/kudu/rpc/constants.h | 63 ++ be/src/kudu/rpc/exactly_once_rpc-test.cc | 589 +++++++++++ be/src/kudu/rpc/inbound_call.cc | 322 ++++++ be/src/kudu/rpc/inbound_call.h | 269 +++++ be/src/kudu/rpc/messenger.cc | 488 +++++++++ be/src/kudu/rpc/messenger.h | 354 +++++++ be/src/kudu/rpc/mt-rpc-test.cc | 291 ++++++ be/src/kudu/rpc/negotiation-test.cc | 1331 +++++++++++++++++++++++++ be/src/kudu/rpc/negotiation.cc | 317 ++++++ be/src/kudu/rpc/negotiation.h | 56 ++ be/src/kudu/rpc/outbound_call.cc | 509 ++++++++++ be/src/kudu/rpc/outbound_call.h | 363 +++++++ be/src/kudu/rpc/protoc-gen-krpc.cc | 674 +++++++++++++ be/src/kudu/rpc/proxy.cc | 115 +++ be/src/kudu/rpc/proxy.h | 121 +++ be/src/kudu/rpc/reactor-test.cc | 98 ++ be/src/kudu/rpc/reactor.cc | 750 ++++++++++++++ be/src/kudu/rpc/reactor.h | 370 +++++++ be/src/kudu/rpc/remote_method.cc | 49 + be/src/kudu/rpc/remote_method.h | 51 + be/src/kudu/rpc/remote_user.cc | 41 + be/src/kudu/rpc/remote_user.h | 98 ++ be/src/kudu/rpc/request_tracker-test.cc | 83 ++ be/src/kudu/rpc/request_tracker.cc | 53 + be/src/kudu/rpc/request_tracker.h | 85 ++ be/src/kudu/rpc/response_callback.h | 31 + be/src/kudu/rpc/result_tracker.cc | 582 +++++++++++ be/src/kudu/rpc/result_tracker.h | 399 ++++++++ be/src/kudu/rpc/retriable_rpc.h | 296 ++++++ be/src/kudu/rpc/rpc-bench.cc | 260 +++++ be/src/kudu/rpc/rpc-test-base.h | 585 +++++++++++ be/src/kudu/rpc/rpc-test.cc | 808 +++++++++++++++ be/src/kudu/rpc/rpc.cc | 96 ++ be/src/kudu/rpc/rpc.h | 218 ++++ be/src/kudu/rpc/rpc_context.cc | 208 ++++ be/src/kudu/rpc/rpc_context.h | 224 +++++ be/src/kudu/rpc/rpc_controller.cc | 149 +++ be/src/kudu/rpc/rpc_controller.h | 256 +++++ be/src/kudu/rpc/rpc_header.proto | 365 +++++++ be/src/kudu/rpc/rpc_introspection.proto | 108 ++ be/src/kudu/rpc/rpc_service.h | 47 + be/src/kudu/rpc/rpc_sidecar.cc | 102 ++ be/src/kudu/rpc/rpc_sidecar.h | 68 ++ be/src/kudu/rpc/rpc_stub-test.cc | 679 +++++++++++++ be/src/kudu/rpc/rpcz_store.cc | 255 +++++ be/src/kudu/rpc/rpcz_store.h | 74 ++ be/src/kudu/rpc/rtest.proto | 150 +++ be/src/kudu/rpc/rtest_diff_package.proto | 26 + be/src/kudu/rpc/sasl_common.cc | 459 +++++++++ be/src/kudu/rpc/sasl_common.h | 126 +++ be/src/kudu/rpc/sasl_helper.cc | 134 +++ be/src/kudu/rpc/sasl_helper.h | 109 ++ be/src/kudu/rpc/serialization.cc | 199 ++++ be/src/kudu/rpc/serialization.h | 88 ++ be/src/kudu/rpc/server_negotiation.cc | 980 ++++++++++++++++++ be/src/kudu/rpc/server_negotiation.h | 248 +++++ be/src/kudu/rpc/service_if.cc | 149 +++ be/src/kudu/rpc/service_if.h | 137 +++ be/src/kudu/rpc/service_pool.cc | 219 ++++ be/src/kudu/rpc/service_pool.h | 98 ++ be/src/kudu/rpc/service_queue-test.cc | 144 +++ be/src/kudu/rpc/service_queue.cc | 142 +++ be/src/kudu/rpc/service_queue.h | 215 ++++ be/src/kudu/rpc/transfer.cc | 264 +++++ be/src/kudu/rpc/transfer.h | 203 ++++ be/src/kudu/rpc/user_credentials.cc | 57 ++ be/src/kudu/rpc/user_credentials.h | 47 + 77 files changed, 20264 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/CMakeLists.txt b/be/src/kudu/rpc/CMakeLists.txt new file mode 100644 index 0000000..0cfe6e9 --- /dev/null +++ b/be/src/kudu/rpc/CMakeLists.txt @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#### Global header protobufs +PROTOBUF_GENERATE_CPP( + RPC_HEADER_PROTO_SRCS RPC_HEADER_PROTO_HDRS RPC_HEADER_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES rpc_header.proto) +ADD_EXPORTABLE_LIBRARY(rpc_header_proto + SRCS ${RPC_HEADER_PROTO_SRCS} + DEPS protobuf pb_util_proto token_proto + NONLINK_DEPS ${RPC_HEADER_PROTO_TGTS}) + +PROTOBUF_GENERATE_CPP( + RPC_INTROSPECTION_PROTO_SRCS RPC_INTROSPECTION_PROTO_HDRS RPC_INTROSPECTION_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES rpc_introspection.proto) +set(RPC_INTROSPECTION_PROTO_LIBS + rpc_header_proto + protobuf) +ADD_EXPORTABLE_LIBRARY(rpc_introspection_proto + SRCS ${RPC_INTROSPECTION_PROTO_SRCS} + DEPS ${RPC_INTROSPECTION_PROTO_LIBS} + NONLINK_DEPS ${RPC_INTROSPECTION_PROTO_TGTS}) + +### RPC library +set(KRPC_SRCS + acceptor_pool.cc + blocking_ops.cc + client_negotiation.cc + connection.cc + constants.cc + inbound_call.cc + messenger.cc + negotiation.cc + outbound_call.cc + proxy.cc + reactor.cc + remote_method.cc + remote_user.cc + request_tracker.cc + result_tracker.cc + rpc.cc + rpc_context.cc + rpc_controller.cc + rpc_sidecar.cc + rpcz_store.cc + sasl_common.cc + sasl_helper.cc + serialization.cc + server_negotiation.cc + service_if.cc + service_pool.cc + service_queue.cc + user_credentials.cc + transfer.cc +) + +set(KRPC_LIBS + cyrus_sasl + gutil + kudu_util + libev + rpc_header_proto + rpc_introspection_proto + security) + +ADD_EXPORTABLE_LIBRARY(krpc + SRCS ${KRPC_SRCS} + DEPS ${KRPC_LIBS}) + +### RPC generator tool +add_executable(protoc-gen-krpc protoc-gen-krpc.cc) +target_link_libraries(protoc-gen-krpc + ${KUDU_BASE_LIBS} + rpc_header_proto + protoc + protobuf + gutil + kudu_util) + +#### RPC test +PROTOBUF_GENERATE_CPP( + RPC_TEST_DIFF_PACKAGE_SRCS RPC_TEST_DIFF_PACKAGE_HDRS RPC_TEST_DIFF_PACKAGE_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES rtest_diff_package.proto) +add_library(rtest_diff_package_proto ${RPC_TEST_DIFF_PACKAGE_SRCS} ${RPC_TEST_DIFF_PACKAGE_HDRS}) +target_link_libraries(rtest_diff_package_proto rpc_header_proto) + +KRPC_GENERATE( + RTEST_KRPC_SRCS RTEST_KRPC_HDRS RTEST_KRPC_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES rtest.proto) +add_library(rtest_krpc ${RTEST_KRPC_SRCS} ${RTEST_KRPC_HDRS}) +target_link_libraries(rtest_krpc + krpc + rpc_header_proto + rtest_diff_package_proto) + +# Tests +set(KUDU_TEST_LINK_LIBS rtest_krpc krpc rpc_header_proto security-test ${KUDU_MIN_TEST_LIBS}) +ADD_KUDU_TEST(exactly_once_rpc-test) +ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true) +ADD_KUDU_TEST(negotiation-test) +ADD_KUDU_TEST(reactor-test) +ADD_KUDU_TEST(request_tracker-test) +ADD_KUDU_TEST(rpc-bench RUN_SERIAL true) +ADD_KUDU_TEST(rpc-test) +ADD_KUDU_TEST(rpc_stub-test) +ADD_KUDU_TEST(service_queue-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/acceptor_pool.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/acceptor_pool.cc b/be/src/kudu/rpc/acceptor_pool.cc new file mode 100644 index 0000000..f3c935c --- /dev/null +++ b/be/src/kudu/rpc/acceptor_pool.cc @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/acceptor_pool.h" + +#include <pthread.h> + +#include <cinttypes> +#include <cstdint> +#include <iostream> +#include <string> +#include <vector> + +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/messenger.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/logging.h" +#include "kudu/util/metrics.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" + +using google::protobuf::Message; +using std::string; + +METRIC_DEFINE_counter(server, rpc_connections_accepted, + "RPC Connections Accepted", + kudu::MetricUnit::kConnections, + "Number of incoming TCP connections made to the RPC server"); + +DEFINE_int32(rpc_acceptor_listen_backlog, 128, + "Socket backlog parameter used when listening for RPC connections. " + "This defines the maximum length to which the queue of pending " + "TCP connections inbound to the RPC server may grow. If a connection " + "request arrives when the queue is full, the client may receive " + "an error. Higher values may help the server ride over bursts of " + "new inbound connection requests."); +TAG_FLAG(rpc_acceptor_listen_backlog, advanced); + +namespace kudu { +namespace rpc { + +AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket, + Sockaddr bind_address) + : messenger_(messenger), + socket_(socket->Release()), + bind_address_(std::move(bind_address)), + rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate( + messenger->metric_entity())), + closing_(false) {} + +AcceptorPool::~AcceptorPool() { + Shutdown(); +} + +Status AcceptorPool::Start(int num_threads) { + RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog)); + + for (int i = 0; i < num_threads; i++) { + scoped_refptr<kudu::Thread> new_thread; + Status s = kudu::Thread::Create("acceptor pool", "acceptor", + &AcceptorPool::RunThread, this, &new_thread); + if (!s.ok()) { + Shutdown(); + return s; + } + threads_.push_back(new_thread); + } + return Status::OK(); +} + +void AcceptorPool::Shutdown() { + if (Acquire_CompareAndSwap(&closing_, false, true) != false) { + VLOG(2) << "Acceptor Pool on " << bind_address_.ToString() + << " already shut down"; + return; + } + +#if defined(__linux__) + // Closing the socket will break us out of accept() if we're in it, and + // prevent future accepts. + WARN_NOT_OK(socket_.Shutdown(true, true), + strings::Substitute("Could not shut down acceptor socket on $0", + bind_address_.ToString())); +#else + // Calling shutdown on an accepting (non-connected) socket is illegal on most + // platforms (but not Linux). Instead, the accepting threads are interrupted + // forcefully. + for (const scoped_refptr<kudu::Thread>& thread : threads_) { + pthread_cancel(thread.get()->pthread_id()); + } +#endif + + for (const scoped_refptr<kudu::Thread>& thread : threads_) { + CHECK_OK(ThreadJoiner(thread.get()).Join()); + } + threads_.clear(); + + // Close the socket: keeping the descriptor open and, possibly, receiving late + // not-to-be-read messages from the peer does not make much sense. The + // Socket::Close() method is called upon destruction of the aggregated socket_ + // object as well. However, the typical ownership pattern of an AcceptorPool + // object includes two references wrapped via a shared_ptr smart pointer: one + // is held by Messenger, another by RpcServer. If not calling Socket::Close() + // here, it would necessary to wait until Messenger::Shutdown() is called for + // the corresponding messenger object to close this socket. + ignore_result(socket_.Close()); +} + +Sockaddr AcceptorPool::bind_address() const { + return bind_address_; +} + +Status AcceptorPool::GetBoundAddress(Sockaddr* addr) const { + return socket_.GetSocketAddress(addr); +} + +void AcceptorPool::RunThread() { + while (true) { + Socket new_sock; + Sockaddr remote; + VLOG(2) << "calling accept() on socket " << socket_.GetFd() + << " listening on " << bind_address_.ToString(); + Status s = socket_.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING); + if (!s.ok()) { + if (Release_Load(&closing_)) { + break; + } + KLOG_EVERY_N_SECS(WARNING, 1) << "AcceptorPool: accept failed: " << s.ToString() + << THROTTLE_MSG; + continue; + } + s = new_sock.SetNoDelay(true); + if (!s.ok()) { + KLOG_EVERY_N_SECS(WARNING, 1) << "Acceptor with remote = " << remote.ToString() + << " failed to set TCP_NODELAY on a newly accepted socket: " + << s.ToString() << THROTTLE_MSG; + continue; + } + rpc_connections_accepted_->Increment(); + messenger_->RegisterInboundSocket(&new_sock, remote); + } + VLOG(1) << "AcceptorPool shutting down."; +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/acceptor_pool.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/acceptor_pool.h b/be/src/kudu/rpc/acceptor_pool.h new file mode 100644 index 0000000..92b7fc5 --- /dev/null +++ b/be/src/kudu/rpc/acceptor_pool.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_RPC_ACCEPTOR_POOL_H +#define KUDU_RPC_ACCEPTOR_POOL_H + +#include <vector> + +#include "kudu/gutil/atomicops.h" +#include "kudu/util/thread.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" + +namespace kudu { + +class Counter; +class Socket; + +namespace rpc { + +class Messenger; + +// A pool of threads calling accept() to create new connections. +// Acceptor pool threads terminate when they notice that the messenger has been +// shut down, if Shutdown() is called, or if the pool object is destructed. +class AcceptorPool { + public: + // Create a new acceptor pool. Calls socket::Release to take ownership of the + // socket. + // 'socket' must be already bound, but should not yet be listening. + AcceptorPool(Messenger *messenger, Socket *socket, Sockaddr bind_address); + ~AcceptorPool(); + + // Start listening and accepting connections. + Status Start(int num_threads); + void Shutdown(); + + // Return the address that the pool is bound to. If the port is specified as + // 0, then this will always return port 0. + Sockaddr bind_address() const; + + // Return the address that the pool is bound to. This only works while the + // socket is open, and if the specified port is 0 then this will return the + // actual port that was bound. + Status GetBoundAddress(Sockaddr* addr) const; + + private: + void RunThread(); + + Messenger *messenger_; + Socket socket_; + Sockaddr bind_address_; + std::vector<scoped_refptr<kudu::Thread> > threads_; + + scoped_refptr<Counter> rpc_connections_accepted_; + + Atomic32 closing_; + + DISALLOW_COPY_AND_ASSIGN(AcceptorPool); +}; + +} // namespace rpc +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/blocking_ops.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/blocking_ops.cc b/be/src/kudu/rpc/blocking_ops.cc new file mode 100644 index 0000000..64ae2c0 --- /dev/null +++ b/be/src/kudu/rpc/blocking_ops.cc @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/blocking_ops.h" + +#include <stdint.h> +#include <string.h> + +#include <glog/logging.h> +#include <google/protobuf/message_lite.h> + +#include "kudu/gutil/endian.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/serialization.h" +#include "kudu/rpc/transfer.h" +#include "kudu/util/faststring.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace rpc { + +using google::protobuf::MessageLite; + +const char kHTTPHeader[] = "HTTP"; + +Status EnsureBlockingMode(const Socket* sock) { + bool is_nonblocking; + RETURN_NOT_OK(sock->IsNonBlocking(&is_nonblocking)); + if (is_nonblocking) { + return Status::IllegalState("Underlying socket is not set to blocking mode!"); + } + return Status::OK(); +} + +Status SendFramedMessageBlocking(Socket* sock, const MessageLite& header, const MessageLite& msg, + const MonoTime& deadline) { + DCHECK(sock != nullptr); + DCHECK(header.IsInitialized()) << "header protobuf must be initialized"; + DCHECK(msg.IsInitialized()) << "msg protobuf must be initialized"; + + RETURN_NOT_OK(EnsureBlockingMode(sock)); + + // Ensure we are in blocking mode. + // These blocking calls are typically not in the fast path, so doing this for all build types. + bool is_non_blocking = false; + RETURN_NOT_OK(sock->IsNonBlocking(&is_non_blocking)); + DCHECK(!is_non_blocking) << "Socket must be in blocking mode to use SendFramedMessage"; + + // Serialize message + faststring param_buf; + serialization::SerializeMessage(msg, ¶m_buf); + + // Serialize header and initial length + faststring header_buf; + serialization::SerializeHeader(header, param_buf.size(), &header_buf); + + // Write header & param to stream + size_t nsent; + RETURN_NOT_OK(sock->BlockingWrite(header_buf.data(), header_buf.size(), &nsent, deadline)); + RETURN_NOT_OK(sock->BlockingWrite(param_buf.data(), param_buf.size(), &nsent, deadline)); + + return Status::OK(); +} + +Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf, + MessageLite* header, Slice* param_buf, const MonoTime& deadline) { + DCHECK(sock != nullptr); + DCHECK(recv_buf != nullptr); + DCHECK(header != nullptr); + DCHECK(param_buf != nullptr); + + RETURN_NOT_OK(EnsureBlockingMode(sock)); + + // Read the message prefix, which specifies the length of the payload. + recv_buf->clear(); + recv_buf->resize(kMsgLengthPrefixLength); + size_t recvd = 0; + RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data(), kMsgLengthPrefixLength, &recvd, deadline)); + uint32_t payload_len = NetworkByteOrder::Load32(recv_buf->data()); + + // Verify that the payload size isn't out of bounds. + // This can happen because of network corruption, or a naughty client. + if (PREDICT_FALSE(payload_len > FLAGS_rpc_max_message_size)) { + // A common user mistake is to try to speak the Kudu RPC protocol to an + // HTTP endpoint, or vice versa. + if (memcmp(recv_buf->data(), kHTTPHeader, strlen(kHTTPHeader)) == 0) { + return Status::IOError( + "received invalid RPC message which appears to be an HTTP response. " + "Verify that you have specified a valid RPC port and not an HTTP port."); + } + + return Status::IOError( + strings::Substitute( + "received invalid message of size $0 which exceeds" + " the rpc_max_message_size of $1 bytes", + payload_len, FLAGS_rpc_max_message_size)); + } + + // Read the message payload. + recvd = 0; + recv_buf->resize(payload_len + kMsgLengthPrefixLength); + RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data() + kMsgLengthPrefixLength, + payload_len, &recvd, deadline)); + RETURN_NOT_OK(serialization::ParseMessage(Slice(*recv_buf), header, param_buf)); + return Status::OK(); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/blocking_ops.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/blocking_ops.h b/be/src/kudu/rpc/blocking_ops.h new file mode 100644 index 0000000..01bb7a6 --- /dev/null +++ b/be/src/kudu/rpc/blocking_ops.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_RPC_BLOCKING_OPS_H +#define KUDU_RPC_BLOCKING_OPS_H + +#include <set> +#include <string> + +namespace google { +namespace protobuf { +class MessageLite; +} // namespace protobuf +} // namespace google + +namespace kudu { + +class faststring; +class MonoTime; +class Slice; +class Sockaddr; +class Socket; +class Status; + +namespace rpc { + +// Returns OK if socket is in blocking mode. Otherwise, returns an error. +Status EnsureBlockingMode(const Socket* sock); + +// Encode and send a message over a socket. +// header: Request or Response header protobuf. +// msg: Protobuf message to send. This message must be fully initialized. +// deadline: Latest time allowed for receive to complete before timeout. +Status SendFramedMessageBlocking(Socket* sock, const google::protobuf::MessageLite& header, + const google::protobuf::MessageLite& msg, const MonoTime& deadline); + +// Receive a full message frame from the server. +// recv_buf: buffer to use for reading the data from the socket. +// header: Request or Response header protobuf. +// param_buf: Slice into recv_buf containing unparsed RPC param protobuf data. +// deadline: Latest time allowed for receive to complete before timeout. +Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf, + google::protobuf::MessageLite* header, Slice* param_buf, const MonoTime& deadline); + +} // namespace rpc +} // namespace kudu + +#endif // KUDU_RPC_BLOCKING_OPS_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/client_negotiation.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/client_negotiation.cc b/be/src/kudu/rpc/client_negotiation.cc new file mode 100644 index 0000000..8ce5190 --- /dev/null +++ b/be/src/kudu/rpc/client_negotiation.cc @@ -0,0 +1,776 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/client_negotiation.h" + +#include <string.h> + +#include <map> +#include <memory> +#include <set> +#include <string> + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <sasl/sasl.h> + +#include "kudu/gutil/casts.h" +#include "kudu/gutil/endian.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/strings/util.h" +#include "kudu/rpc/blocking_ops.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/sasl_common.h" +#include "kudu/rpc/sasl_helper.h" +#include "kudu/rpc/serialization.h" +#include "kudu/security/cert.h" +#include "kudu/security/tls_context.h" +#include "kudu/security/tls_handshake.h" +#include "kudu/security/tls_socket.h" +#include "kudu/util/faststring.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/trace.h" + +using std::map; +using std::set; +using std::string; +using std::unique_ptr; + +using strings::Substitute; + +DECLARE_bool(rpc_encrypt_loopback_connections); + +namespace kudu { +namespace rpc { + +static int ClientNegotiationGetoptCb(ClientNegotiation* client_negotiation, + const char* plugin_name, + const char* option, + const char** result, + unsigned* len) { + return client_negotiation->GetOptionCb(plugin_name, option, result, len); +} + +static int ClientNegotiationSimpleCb(ClientNegotiation* client_negotiation, + int id, + const char** result, + unsigned* len) { + return client_negotiation->SimpleCb(id, result, len); +} + +static int ClientNegotiationSecretCb(sasl_conn_t* conn, + ClientNegotiation* client_negotiation, + int id, + sasl_secret_t** psecret) { + return client_negotiation->SecretCb(conn, id, psecret); +} + +// Return an appropriately-typed Status object based on an ErrorStatusPB returned +// from an Error RPC. +// In case there is no relevant Status type, return a RuntimeError. +static Status StatusFromRpcError(const ErrorStatusPB& error) { + DCHECK(error.IsInitialized()) << "Error status PB must be initialized"; + if (PREDICT_FALSE(!error.has_code())) { + return Status::RuntimeError(error.message()); + } + const string code_name = ErrorStatusPB::RpcErrorCodePB_Name(error.code()); + switch (error.code()) { + case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED: // fall-through + case ErrorStatusPB_RpcErrorCodePB_FATAL_INVALID_AUTHENTICATION_TOKEN: + return Status::NotAuthorized(code_name, error.message()); + case ErrorStatusPB_RpcErrorCodePB_ERROR_UNAVAILABLE: + return Status::ServiceUnavailable(code_name, error.message()); + default: + return Status::RuntimeError(code_name, error.message()); + } +} + +ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket, + const security::TlsContext* tls_context, + const boost::optional<security::SignedTokenPB>& authn_token, + RpcEncryption encryption) + : socket_(std::move(socket)), + helper_(SaslHelper::CLIENT), + tls_context_(tls_context), + encryption_(encryption), + tls_negotiated_(false), + authn_token_(authn_token), + psecret_(nullptr, std::free), + negotiated_authn_(AuthenticationType::INVALID), + negotiated_mech_(SaslMechanism::INVALID), + deadline_(MonoTime::Max()) { + callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT, + reinterpret_cast<int (*)()>(&ClientNegotiationGetoptCb), this)); + callbacks_.push_back(SaslBuildCallback(SASL_CB_AUTHNAME, + reinterpret_cast<int (*)()>(&ClientNegotiationSimpleCb), this)); + callbacks_.push_back(SaslBuildCallback(SASL_CB_PASS, + reinterpret_cast<int (*)()>(&ClientNegotiationSecretCb), this)); + callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr)); + DCHECK(socket_); + DCHECK(tls_context_); +} + +Status ClientNegotiation::EnablePlain(const string& user, const string& pass) { + RETURN_NOT_OK(helper_.EnablePlain()); + plain_auth_user_ = user; + plain_pass_ = pass; + return Status::OK(); +} + +Status ClientNegotiation::EnableGSSAPI() { + return helper_.EnableGSSAPI(); +} + +SaslMechanism::Type ClientNegotiation::negotiated_mechanism() const { + return negotiated_mech_; +} + +void ClientNegotiation::set_server_fqdn(const string& domain_name) { + helper_.set_server_fqdn(domain_name); +} + +void ClientNegotiation::set_deadline(const MonoTime& deadline) { + deadline_ = deadline; +} + +Status ClientNegotiation::Negotiate(unique_ptr<ErrorStatusPB>* rpc_error) { + TRACE("Beginning negotiation"); + + // Ensure we can use blocking calls on the socket during negotiation. + RETURN_NOT_OK(EnsureBlockingMode(socket_.get())); + + // Step 1: send the connection header. + RETURN_NOT_OK(SendConnectionHeader()); + + faststring recv_buf; + + { // Step 2: send and receive the NEGOTIATE step messages. + RETURN_NOT_OK(SendNegotiate()); + NegotiatePB response; + RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error)); + RETURN_NOT_OK(HandleNegotiate(response)); + TRACE("Negotiated authn=$0", AuthenticationTypeToString(negotiated_authn_)); + } + + // Step 3: if both ends support TLS, do a TLS handshake. + // TODO(KUDU-1921): allow the client to require TLS. + if (encryption_ != RpcEncryption::DISABLED && + ContainsKey(server_features_, TLS)) { + RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::CLIENT, + &tls_handshake_)); + + if (negotiated_authn_ == AuthenticationType::SASL) { + // When using SASL authentication, verifying the server's certificate is + // not necessary. This allows the client to still use TLS encryption for + // connections to servers which only have a self-signed certificate. + tls_handshake_.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE); + } + + // To initiate the TLS handshake, we pretend as if the server sent us an + // empty TLS_HANDSHAKE token. + NegotiatePB initial; + initial.set_step(NegotiatePB::TLS_HANDSHAKE); + initial.set_tls_handshake(""); + Status s = HandleTlsHandshake(initial); + + while (s.IsIncomplete()) { + NegotiatePB response; + RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error)); + s = HandleTlsHandshake(response); + } + RETURN_NOT_OK(s); + tls_negotiated_ = true; + } + + // Step 4: Authentication + switch (negotiated_authn_) { + case AuthenticationType::SASL: + RETURN_NOT_OK(AuthenticateBySasl(&recv_buf, rpc_error)); + break; + case AuthenticationType::TOKEN: + RETURN_NOT_OK(AuthenticateByToken(&recv_buf, rpc_error)); + break; + case AuthenticationType::CERTIFICATE: + // The TLS handshake has already authenticated the server. + break; + case AuthenticationType::INVALID: LOG(FATAL) << "unreachable"; + } + + // Step 5: Send connection context. + RETURN_NOT_OK(SendConnectionContext()); + + TRACE("Negotiation successful"); + return Status::OK(); +} + +Status ClientNegotiation::SendNegotiatePB(const NegotiatePB& msg) { + RequestHeader header; + header.set_call_id(kNegotiateCallId); + + DCHECK(socket_); + DCHECK(msg.IsInitialized()) << "message must be initialized"; + DCHECK(msg.has_step()) << "message must have a step"; + + TRACE("Sending $0 NegotiatePB request", NegotiatePB::NegotiateStep_Name(msg.step())); + return SendFramedMessageBlocking(socket(), header, msg, deadline_); +} + +Status ClientNegotiation::RecvNegotiatePB(NegotiatePB* msg, + faststring* buffer, + unique_ptr<ErrorStatusPB>* rpc_error) { + ResponseHeader header; + Slice param_buf; + RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), buffer, &header, ¶m_buf, deadline_)); + RETURN_NOT_OK(helper_.CheckNegotiateCallId(header.call_id())); + + if (header.is_error()) { + return ParseError(param_buf, rpc_error); + } + + RETURN_NOT_OK(helper_.ParseNegotiatePB(param_buf, msg)); + TRACE("Received $0 NegotiatePB response", NegotiatePB::NegotiateStep_Name(msg->step())); + return Status::OK(); +} + +Status ClientNegotiation::ParseError(const Slice& err_data, + unique_ptr<ErrorStatusPB>* rpc_error) { + unique_ptr<ErrorStatusPB> error(new ErrorStatusPB); + if (!error->ParseFromArray(err_data.data(), err_data.size())) { + return Status::IOError("invalid error response, missing fields", + error->InitializationErrorString()); + } + Status s = StatusFromRpcError(*error); + TRACE("Received error response from server: $0", s.ToString()); + + if (rpc_error) { + rpc_error->swap(error); + } + return s; +} + +Status ClientNegotiation::SendConnectionHeader() { + const uint8_t buflen = kMagicNumberLength + kHeaderFlagsLength; + uint8_t buf[buflen]; + serialization::SerializeConnHeader(buf); + size_t nsent; + return socket()->BlockingWrite(buf, buflen, &nsent, deadline_); +} + +Status ClientNegotiation::InitSaslClient() { + RETURN_NOT_OK(SaslInit()); + + // TODO(KUDU-1922): consider setting SASL_SUCCESS_DATA + unsigned flags = 0; + + sasl_conn_t* sasl_conn = nullptr; + RETURN_NOT_OK_PREPEND(WrapSaslCall(nullptr /* no conn */, [&]() { + return sasl_client_new( + kSaslProtoName, // Registered name of the service using SASL. Required. + helper_.server_fqdn(), // The fully qualified domain name of the remote server. + nullptr, // Local and remote IP address strings. (we don't use + nullptr, // any mechanisms which require this info.) + &callbacks_[0], // Connection-specific callbacks. + flags, + &sasl_conn); + }), "Unable to create new SASL client"); + sasl_conn_.reset(sasl_conn); + return Status::OK(); +} + +Status ClientNegotiation::SendNegotiate() { + NegotiatePB msg; + msg.set_step(NegotiatePB::NEGOTIATE); + + // Advertise our supported features. + client_features_ = kSupportedClientRpcFeatureFlags; + + if (encryption_ != RpcEncryption::DISABLED) { + client_features_.insert(TLS); + // If the remote peer is local, then we allow using TLS for authentication + // without encryption or integrity. + if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) { + client_features_.insert(TLS_AUTHENTICATION_ONLY); + } + } + + for (RpcFeatureFlag feature : client_features_) { + msg.add_supported_features(feature); + } + + if (!helper_.EnabledMechs().empty()) { + msg.add_authn_types()->mutable_sasl(); + } + if (tls_context_->has_signed_cert() && !tls_context_->is_external_cert()) { + // We only provide authenticated TLS if the certificates are generated + // by the internal CA. + msg.add_authn_types()->mutable_certificate(); + } + if (authn_token_ && tls_context_->has_trusted_cert()) { + // TODO(KUDU-1924): check that the authn token is not expired. Can this be done + // reliably on clients? + msg.add_authn_types()->mutable_token(); + } + + if (PREDICT_FALSE(msg.authn_types().empty())) { + return Status::NotAuthorized("client is not configured with an authentication type"); + } + + RETURN_NOT_OK(SendNegotiatePB(msg)); + return Status::OK(); +} + +Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) { + if (PREDICT_FALSE(response.step() != NegotiatePB::NEGOTIATE)) { + return Status::NotAuthorized("expected NEGOTIATE step", + NegotiatePB::NegotiateStep_Name(response.step())); + } + TRACE("Received NEGOTIATE response from server"); + + // Fill in the set of features supported by the server. + for (int flag : response.supported_features()) { + // We only add the features that our local build knows about. + RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ? + static_cast<RpcFeatureFlag>(flag) : UNKNOWN; + if (feature_flag != UNKNOWN) { + server_features_.insert(feature_flag); + } + } + + if (encryption_ == RpcEncryption::REQUIRED && + !ContainsKey(server_features_, RpcFeatureFlag::TLS)) { + return Status::NotAuthorized("server does not support required TLS encryption"); + } + + // Get the authentication type which the server would like to use. + DCHECK_LE(response.authn_types().size(), 1); + if (response.authn_types().empty()) { + // If the server doesn't send back an authentication type, default to SASL + // in order to maintain backwards compatibility. + negotiated_authn_ = AuthenticationType::SASL; + } else { + const auto& authn_type = response.authn_types(0); + switch (authn_type.type_case()) { + case AuthenticationTypePB::kSasl: + negotiated_authn_ = AuthenticationType::SASL; + break; + case AuthenticationTypePB::kToken: + // TODO(todd): we should also be checking tls_context_->has_trusted_cert() + // here to match the original logic we used to advertise TOKEN support, + // or perhaps just check explicitly whether we advertised TOKEN. + if (!authn_token_) { + return Status::RuntimeError( + "server chose token authentication, but client has no token"); + } + negotiated_authn_ = AuthenticationType::TOKEN; + return Status::OK(); + case AuthenticationTypePB::kCertificate: + if (!tls_context_->has_signed_cert()) { + return Status::RuntimeError( + "server chose certificate authentication, but client has no certificate"); + } + negotiated_authn_ = AuthenticationType::CERTIFICATE; + return Status::OK(); + case AuthenticationTypePB::TYPE_NOT_SET: + return Status::RuntimeError("server chose an unknown authentication type"); + } + } + + DCHECK_EQ(negotiated_authn_, AuthenticationType::SASL); + + // Build a map of the SASL mechanisms offered by the server. + const set<SaslMechanism::Type>& client_mechs = helper_.EnabledMechs(); + set<SaslMechanism::Type> server_mechs; + for (const NegotiatePB::SaslMechanism& sasl_mech : response.sasl_mechanisms()) { + auto mech = SaslMechanism::value_of(sasl_mech.mechanism()); + if (mech == SaslMechanism::INVALID) { + continue; + } + server_mechs.insert(mech); + } + + // Determine which SASL mechanism to use for authenticating the connection. + // We pick the most preferred mechanism which is supported by both parties. + // The preference list in order of most to least preferred: + // * GSSAPI + // * PLAIN + set<SaslMechanism::Type> common_mechs = STLSetIntersection(client_mechs, server_mechs); + + if (common_mechs.empty()) { + if (ContainsKey(server_mechs, SaslMechanism::GSSAPI) && + !ContainsKey(client_mechs, SaslMechanism::GSSAPI)) { + return Status::NotAuthorized("server requires authentication, " + "but client does not have Kerberos enabled"); + } + if (!ContainsKey(server_mechs, SaslMechanism::GSSAPI) && + ContainsKey(client_mechs, SaslMechanism::GSSAPI)) { + return Status::NotAuthorized("client requires authentication, " + "but server does not have Kerberos enabled"); + } + string msg = Substitute("client/server supported SASL mechanism mismatch; " + "client mechanisms: [$0], server mechanisms: [$1]", + JoinMapped(client_mechs, SaslMechanism::name_of, ", "), + JoinMapped(server_mechs, SaslMechanism::name_of, ", ")); + + // For now, there should never be a SASL mechanism mismatch that isn't due + // to one of the sides requiring Kerberos and the other not having it, so + // lets sanity check that. + DLOG(FATAL) << msg; + return Status::NotAuthorized(msg); + } + + // TODO(KUDU-1921): allow the client to require authentication. + if (ContainsKey(common_mechs, SaslMechanism::GSSAPI)) { + negotiated_mech_ = SaslMechanism::GSSAPI; + } else { + DCHECK(ContainsKey(common_mechs, SaslMechanism::PLAIN)); + negotiated_mech_ = SaslMechanism::PLAIN; + } + + return Status::OK(); +} + +Status ClientNegotiation::SendTlsHandshake(string tls_token) { + TRACE("Sending TLS_HANDSHAKE message to server"); + NegotiatePB msg; + msg.set_step(NegotiatePB::TLS_HANDSHAKE); + msg.mutable_tls_handshake()->swap(tls_token); + return SendNegotiatePB(msg); +} + +Status ClientNegotiation::HandleTlsHandshake(const NegotiatePB& response) { + if (PREDICT_FALSE(response.step() != NegotiatePB::TLS_HANDSHAKE)) { + return Status::NotAuthorized("expected TLS_HANDSHAKE step", + NegotiatePB::NegotiateStep_Name(response.step())); + } + TRACE("Received TLS_HANDSHAKE response from server"); + + if (PREDICT_FALSE(!response.has_tls_handshake())) { + return Status::NotAuthorized("No TLS handshake token in TLS_HANDSHAKE response from server"); + } + + string token; + Status s = tls_handshake_.Continue(response.tls_handshake(), &token); + if (s.IsIncomplete()) { + // Another roundtrip is required to complete the handshake. + RETURN_NOT_OK(SendTlsHandshake(std::move(token))); + } + + // Check that the handshake step didn't produce an error. Will also propagate + // an Incomplete status. + RETURN_NOT_OK(s); + + // TLS handshake is finished. + DCHECK(token.empty()); + + if (ContainsKey(server_features_, TLS_AUTHENTICATION_ONLY) && + ContainsKey(client_features_, TLS_AUTHENTICATION_ONLY)) { + TRACE("Negotiated auth-only $0 with cipher suite $1", + tls_handshake_.GetProtocol(), tls_handshake_.GetCipherSuite()); + return tls_handshake_.FinishNoWrap(*socket_); + } + + TRACE("Negotiated $0 with cipher suite $1", + tls_handshake_.GetProtocol(), tls_handshake_.GetCipherSuite()); + return tls_handshake_.Finish(&socket_); +} + +Status ClientNegotiation::AuthenticateBySasl(faststring* recv_buf, + unique_ptr<ErrorStatusPB>* rpc_error) { + RETURN_NOT_OK(InitSaslClient()); + Status s = SendSaslInitiate(); + + // HandleSasl[Initiate, Challenge] return incomplete if an additional + // challenge step is required, or OK if a SASL_SUCCESS message is expected. + while (s.IsIncomplete()) { + NegotiatePB challenge; + RETURN_NOT_OK(RecvNegotiatePB(&challenge, recv_buf, rpc_error)); + s = HandleSaslChallenge(challenge); + } + + // Propagate failure from SendSaslInitiate or HandleSaslChallenge. + RETURN_NOT_OK(s); + + // Server challenges are over; we now expect the success message. + NegotiatePB success; + RETURN_NOT_OK(RecvNegotiatePB(&success, recv_buf, rpc_error)); + return HandleSaslSuccess(success); +} + +Status ClientNegotiation::AuthenticateByToken(faststring* recv_buf, + unique_ptr<ErrorStatusPB>* rpc_error) { + // Sanity check that TLS has been negotiated. Sending the token on an + // unencrypted channel is a big no-no. + CHECK(tls_negotiated_); + + // Send the token to the server. + NegotiatePB pb; + pb.set_step(NegotiatePB::TOKEN_EXCHANGE); + pb.mutable_authn_token()->Swap(authn_token_.get_ptr()); + RETURN_NOT_OK(SendNegotiatePB(pb)); + pb.Clear(); + + // Check that the server responds with a non-error TOKEN_EXCHANGE message. + RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf, rpc_error)); + if (pb.step() != NegotiatePB::TOKEN_EXCHANGE) { + return Status::NotAuthorized("expected TOKEN_EXCHANGE step", + NegotiatePB::NegotiateStep_Name(pb.step())); + } + + return Status::OK(); +} + +Status ClientNegotiation::SendSaslInitiate() { + TRACE("Initiating SASL $0 handshake", SaslMechanism::name_of(negotiated_mech_)); + + // At this point we've already chosen the SASL mechanism to use + // (negotiated_mech_), but we need to let the SASL library know. SASL likes to + // choose the mechanism from among a list of possible options, so we simply + // provide it one option, and then check that it picks that option. + + const char* init_msg = nullptr; + unsigned init_msg_len = 0; + const char* negotiated_mech = nullptr; + + /* select a mechanism for a connection + * mechlist -- mechanisms server has available (punctuation ignored) + * output: + * prompt_need -- on SASL_INTERACT, list of prompts needed to continue + * clientout -- the initial client response to send to the server + * mech -- set to mechanism name + * + * Returns: + * SASL_OK -- success + * SASL_CONTINUE -- negotiation required + * SASL_NOMEM -- not enough memory + * SASL_NOMECH -- no mechanism meets requested properties + * SASL_INTERACT -- user interaction needed to fill in prompt_need list + */ + TRACE("Calling sasl_client_start()"); + const Status s = WrapSaslCall(sasl_conn_.get(), [&]() { + return sasl_client_start( + sasl_conn_.get(), // The SASL connection context created by init() + SaslMechanism::name_of(negotiated_mech_), // The list of mechanisms to negotiate. + nullptr, // Disables INTERACT return if NULL. + &init_msg, // Filled in on success. + &init_msg_len, // Filled in on success. + &negotiated_mech); // Filled in on success. + }); + + if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) { + return s; + } + + // Check that the SASL library is using the mechanism that we picked. + DCHECK_EQ(SaslMechanism::value_of(negotiated_mech), negotiated_mech_); + + // If the negotiated mechanism is GSSAPI (Kerberos), configure SASL to use + // integrity protection so that the channel bindings and nonce can be + // verified. + if (negotiated_mech_ == SaslMechanism::GSSAPI) { + RETURN_NOT_OK(EnableIntegrityProtection(sasl_conn_.get())); + } + + NegotiatePB msg; + msg.set_step(NegotiatePB::SASL_INITIATE); + msg.mutable_token()->assign(init_msg, init_msg_len); + msg.add_sasl_mechanisms()->set_mechanism(negotiated_mech); + RETURN_NOT_OK(SendNegotiatePB(msg)); + return s; +} + +Status ClientNegotiation::SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) { + NegotiatePB reply; + reply.set_step(NegotiatePB::SASL_RESPONSE); + reply.mutable_token()->assign(resp_msg, resp_msg_len); + return SendNegotiatePB(reply); +} + +Status ClientNegotiation::HandleSaslChallenge(const NegotiatePB& response) { + if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_CHALLENGE)) { + return Status::NotAuthorized("expected SASL_CHALLENGE step", + NegotiatePB::NegotiateStep_Name(response.step())); + } + TRACE("Received SASL_CHALLENGE response from server"); + if (PREDICT_FALSE(!response.has_token())) { + return Status::NotAuthorized("no token in SASL_CHALLENGE response from server"); + } + + const char* out = nullptr; + unsigned out_len = 0; + const Status s = DoSaslStep(response.token(), &out, &out_len); + if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) { + return s; + } + + RETURN_NOT_OK(SendSaslResponse(out, out_len)); + return s; +} + +Status ClientNegotiation::HandleSaslSuccess(const NegotiatePB& response) { + if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_SUCCESS)) { + return Status::NotAuthorized("expected SASL_SUCCESS step", + NegotiatePB::NegotiateStep_Name(response.step())); + } + TRACE("Received SASL_SUCCESS response from server"); + + if (negotiated_mech_ == SaslMechanism::GSSAPI) { + if (response.has_nonce()) { + // Grab the nonce from the server, if it has sent one. We'll send it back + // later with SASL integrity protection as part of the connection context. + nonce_ = response.nonce(); + } + + if (tls_negotiated_) { + // Check the channel bindings provided by the server against the expected channel bindings. + if (!response.has_channel_bindings()) { + return Status::NotAuthorized("no channel bindings provided by server"); + } + + security::Cert cert; + RETURN_NOT_OK(tls_handshake_.GetRemoteCert(&cert)); + + string expected_channel_bindings; + RETURN_NOT_OK_PREPEND(cert.GetServerEndPointChannelBindings(&expected_channel_bindings), + "failed to generate channel bindings"); + + string received_channel_bindings; + RETURN_NOT_OK_PREPEND(SaslDecode(sasl_conn_.get(), + response.channel_bindings(), + &received_channel_bindings), + "failed to decode channel bindings"); + + if (expected_channel_bindings != received_channel_bindings) { + Sockaddr addr; + ignore_result(socket_->GetPeerAddress(&addr)); + + LOG(WARNING) << "Received invalid channel bindings from server " + << addr.ToString() + << ", this could indicate an active network man-in-the-middle"; + return Status::NotAuthorized("channel bindings do not match"); + } + } + } + + return Status::OK(); +} + +Status ClientNegotiation::DoSaslStep(const string& in, const char** out, unsigned* out_len) { + TRACE("Calling sasl_client_step()"); + + return WrapSaslCall(sasl_conn_.get(), [&]() { + return sasl_client_step(sasl_conn_.get(), in.c_str(), in.length(), nullptr, out, out_len); + }); +} + +Status ClientNegotiation::SendConnectionContext() { + TRACE("Sending connection context"); + RequestHeader header; + header.set_call_id(kConnectionContextCallId); + + ConnectionContextPB conn_context; + // This field is deprecated but used by servers <Kudu 1.1. Newer server versions ignore + // this and use the SASL-provided username instead. + conn_context.mutable_deprecated_user_info()->set_real_user( + plain_auth_user_.empty() ? "cpp-client" : plain_auth_user_); + + if (nonce_) { + // Reply with the SASL-protected nonce. We only set the nonce when using SASL GSSAPI. + RETURN_NOT_OK(SaslEncode(sasl_conn_.get(), *nonce_, conn_context.mutable_encoded_nonce())); + } + + return SendFramedMessageBlocking(socket(), header, conn_context, deadline_); +} + +int ClientNegotiation::GetOptionCb(const char* plugin_name, const char* option, + const char** result, unsigned* len) { + return helper_.GetOptionCb(plugin_name, option, result, len); +} + +// Used for PLAIN. +// SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE +int ClientNegotiation::SimpleCb(int id, const char** result, unsigned* len) { + if (PREDICT_FALSE(!helper_.IsPlainEnabled())) { + LOG(DFATAL) << "Simple callback called, but PLAIN auth is not enabled"; + return SASL_FAIL; + } + if (PREDICT_FALSE(result == nullptr)) { + LOG(DFATAL) << "result outparam is NULL"; + return SASL_BADPARAM; + } + switch (id) { + // TODO(unknown): Support impersonation? + // For impersonation, USER is the impersonated user, AUTHNAME is the "sudoer". + case SASL_CB_USER: + TRACE("callback for SASL_CB_USER"); + *result = plain_auth_user_.c_str(); + if (len != nullptr) *len = plain_auth_user_.length(); + break; + case SASL_CB_AUTHNAME: + TRACE("callback for SASL_CB_AUTHNAME"); + *result = plain_auth_user_.c_str(); + if (len != nullptr) *len = plain_auth_user_.length(); + break; + case SASL_CB_LANGUAGE: + LOG(DFATAL) << "Unable to handle SASL callback type SASL_CB_LANGUAGE" + << "(" << id << ")"; + return SASL_BADPARAM; + default: + LOG(DFATAL) << "Unexpected SASL callback type: " << id; + return SASL_BADPARAM; + } + + return SASL_OK; +} + +// Used for PLAIN. +// SASL callback for SASL_CB_PASS: User password. +int ClientNegotiation::SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret) { + if (PREDICT_FALSE(!helper_.IsPlainEnabled())) { + LOG(DFATAL) << "Plain secret callback called, but PLAIN auth is not enabled"; + return SASL_FAIL; + } + switch (id) { + case SASL_CB_PASS: { + if (!conn || !psecret) return SASL_BADPARAM; + + size_t len = plain_pass_.length(); + *psecret = reinterpret_cast<sasl_secret_t*>(malloc(sizeof(sasl_secret_t) + len)); + if (!*psecret) { + return SASL_NOMEM; + } + psecret_.reset(*psecret); // Ensure that we free() this structure later. + (*psecret)->len = len; + memcpy((*psecret)->data, plain_pass_.c_str(), len + 1); + break; + } + default: + LOG(DFATAL) << "Unexpected SASL callback type: " << id; + return SASL_BADPARAM; + } + + return SASL_OK; +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/client_negotiation.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/client_negotiation.h b/be/src/kudu/rpc/client_negotiation.h new file mode 100644 index 0000000..aa8c6d9 --- /dev/null +++ b/be/src/kudu/rpc/client_negotiation.h @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdlib> +#include <memory> +#include <set> +#include <string> +#include <vector> + +#include <boost/optional.hpp> +#include <sasl/sasl.h> + +#include "kudu/rpc/negotiation.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/sasl_common.h" +#include "kudu/rpc/sasl_helper.h" +#include "kudu/security/tls_handshake.h" +#include "kudu/security/token.pb.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" + +namespace kudu { + +namespace security { +class TlsContext; +} + +namespace rpc { + +class NegotiatePB; +class NegotiatePB_SaslAuth; +class ResponseHeader; + +// Class for doing KRPC negotiation with a remote server over a bidirectional socket. +// Operations on this class are NOT thread-safe. +class ClientNegotiation { + public: + // Creates a new client negotiation instance, taking ownership of the + // provided socket. After completing the negotiation process by setting the + // desired options and calling Negotiate(), the socket can be retrieved with + // 'release_socket'. + // + // The provided TlsContext must outlive this negotiation instance. + ClientNegotiation(std::unique_ptr<Socket> socket, + const security::TlsContext* tls_context, + const boost::optional<security::SignedTokenPB>& authn_token, + RpcEncryption encryption); + + // Enable PLAIN authentication. + // Must be called before Negotiate(). + Status EnablePlain(const std::string& user, + const std::string& pass); + + // Enable GSSAPI authentication. + // Must be called before Negotiate(). + Status EnableGSSAPI(); + + // Returns mechanism negotiated by this connection. + // Must be called after Negotiate(). + SaslMechanism::Type negotiated_mechanism() const; + + // Returns the negotiated authentication type for the connection. + // Must be called after Negotiate(). + AuthenticationType negotiated_authn() const { + DCHECK_NE(negotiated_authn_, AuthenticationType::INVALID); + return negotiated_authn_; + } + + // Returns true if TLS was negotiated. + // Must be called after Negotiate(). + bool tls_negotiated() const { + return tls_negotiated_; + } + + // Returns the set of RPC system features supported by the remote server. + // Must be called before Negotiate(). + std::set<RpcFeatureFlag> server_features() const { + return server_features_; + } + + // Returns the set of RPC system features supported by the remote server. + // Must be called after Negotiate(). + // Subsequent calls to this method or server_features() will return an empty set. + std::set<RpcFeatureFlag> take_server_features() { + return std::move(server_features_); + } + + // Specify the fully-qualified domain name of the remote server. + // Must be called before Negotiate(). Required for some mechanisms. + void set_server_fqdn(const std::string& domain_name); + + // Set deadline for connection negotiation. + void set_deadline(const MonoTime& deadline); + + Socket* socket() { return socket_.get(); } + + // Takes and returns the socket owned by this client negotiation. The caller + // will own the socket after this call, and the negotiation instance should no + // longer be used. Must be called after Negotiate(). Subsequent calls to this + // method or socket() will return a null pointer. + std::unique_ptr<Socket> release_socket() { return std::move(socket_); } + + // Negotiate with the remote server. Should only be called once per + // ClientNegotiation and socket instance, after all options have been set. + // + // Returns OK on success, otherwise may return NotAuthorized, NotSupported, or + // another non-OK status. + Status Negotiate(std::unique_ptr<ErrorStatusPB>* rpc_error = nullptr); + + // SASL callback for plugin options, supported mechanisms, etc. + // Returns SASL_FAIL if the option is not handled, which does not fail the handshake. + int GetOptionCb(const char* plugin_name, const char* option, + const char** result, unsigned* len); + + // SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE + int SimpleCb(int id, const char** result, unsigned* len); + + // SASL callback for SASL_CB_PASS + int SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret); + + private: + + // Encode and send the specified negotiate request message to the server. + Status SendNegotiatePB(const NegotiatePB& msg) WARN_UNUSED_RESULT; + + // Receive a negotiate response message from the server, deserializing it into 'msg'. + // Validates that the response is not an error. + Status RecvNegotiatePB(NegotiatePB* msg, + faststring* buffer, + std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT; + + // Parse error status message from raw bytes of an ErrorStatusPB. + Status ParseError(const Slice& err_data, + std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT; + + Status SendConnectionHeader() WARN_UNUSED_RESULT; + + // Initialize the SASL client negotiation instance. + Status InitSaslClient() WARN_UNUSED_RESULT; + + // Send a NEGOTIATE step message to the server. + Status SendNegotiate() WARN_UNUSED_RESULT; + + // Handle NEGOTIATE step response from the server. + Status HandleNegotiate(const NegotiatePB& response) WARN_UNUSED_RESULT; + + // Send a TLS_HANDSHAKE request message to the server with the provided token. + Status SendTlsHandshake(std::string tls_token) WARN_UNUSED_RESULT; + + // Handle a TLS_HANDSHAKE response message from the server. + Status HandleTlsHandshake(const NegotiatePB& response) WARN_UNUSED_RESULT; + + // Authenticate to the server using SASL. + // 'recv_buf' allows a receive buffer to be reused. + Status AuthenticateBySasl(faststring* recv_buf, + std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT; + + // Authenticate to the server using a token. + // 'recv_buf' allows a receive buffer to be reused. + Status AuthenticateByToken(faststring* recv_buf, + std::unique_ptr<ErrorStatusPB> *rpc_error) WARN_UNUSED_RESULT; + + // Send an SASL_INITIATE message to the server. + // Returns: + // Status::OK if the SASL_SUCCESS message is expected next. + // Status::Incomplete if the SASL_CHALLENGE message is expected next. + // Any other status indicates an error. + Status SendSaslInitiate() WARN_UNUSED_RESULT; + + // Send a SASL_RESPONSE message to the server. + Status SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) WARN_UNUSED_RESULT; + + // Handle case when server sends SASL_CHALLENGE response. + // Returns: + // Status::OK if a SASL_SUCCESS message is expected next. + // Status::Incomplete if another SASL_CHALLENGE message is expected. + // Any other status indicates an error. + Status HandleSaslChallenge(const NegotiatePB& response) WARN_UNUSED_RESULT; + + // Handle case when server sends SASL_SUCCESS response. + Status HandleSaslSuccess(const NegotiatePB& response) WARN_UNUSED_RESULT; + + // Perform a client-side step of the SASL negotiation. + // Input is what came from the server. Output is what we will send back to the server. + // Returns: + // Status::OK if sasl_client_step returns SASL_OK. + // Status::Incomplete if sasl_client_step returns SASL_CONTINUE + // otherwise returns an appropriate error status. + Status DoSaslStep(const std::string& in, const char** out, unsigned* out_len) WARN_UNUSED_RESULT; + + Status SendConnectionContext() WARN_UNUSED_RESULT; + + // The socket to the remote server. + std::unique_ptr<Socket> socket_; + + // SASL state. + std::vector<sasl_callback_t> callbacks_; + std::unique_ptr<sasl_conn_t, SaslDeleter> sasl_conn_; + SaslHelper helper_; + boost::optional<std::string> nonce_; + + // TLS state. + const security::TlsContext* tls_context_; + security::TlsHandshake tls_handshake_; + const RpcEncryption encryption_; + bool tls_negotiated_; + + // TSK state. + boost::optional<security::SignedTokenPB> authn_token_; + + // Authentication state. + std::string plain_auth_user_; + std::string plain_pass_; + std::unique_ptr<sasl_secret_t, decltype(std::free)*> psecret_; + + // The set of features advertised by the client. Filled in when we send + // the first message. This is not necessarily constant since some features + // may be dynamically enabled. + std::set<RpcFeatureFlag> client_features_; + + // The set of features supported by the server. Filled in during negotiation. + std::set<RpcFeatureFlag> server_features_; + + // The authentication type. Filled in during negotiation. + AuthenticationType negotiated_authn_; + + // The SASL mechanism used by the connection. Filled in during negotiation. + SaslMechanism::Type negotiated_mech_; + + // Negotiation timeout deadline. + MonoTime deadline_; +}; + +} // namespace rpc +} // namespace kudu
