http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/service_queue.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/service_queue.h b/be/src/kudu/rpc/service_queue.h new file mode 100644 index 0000000..2751a30 --- /dev/null +++ b/be/src/kudu/rpc/service_queue.h @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_SERVICE_QUEUE_H +#define KUDU_UTIL_SERVICE_QUEUE_H + +#include <memory> +#include <string> +#include <set> +#include <vector> + +#include <glog/logging.h> + +#include "kudu/gutil/dynamic_annotations.h" +#include "kudu/gutil/macros.h" +#include "kudu/rpc/inbound_call.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" + +namespace boost { +template <class T> +class optional; +} + +namespace kudu { +namespace rpc { + +// Return values for ServiceQueue::Put() +enum QueueStatus { + QUEUE_SUCCESS = 0, + QUEUE_SHUTDOWN = 1, + QUEUE_FULL = 2 +}; + +// Blocking queue used for passing inbound RPC calls to the service handler pool. +// Calls are dequeued in 'earliest-deadline first' order. The queue also maintains a +// bounded number of calls. If the queue overflows, then calls with deadlines farthest +// in the future are evicted. +// +// When calls do not provide deadlines, the RPC layer considers their deadline to +// be infinitely in the future. This means that any call that does have a deadline +// can evict any call that does not have a deadline. This incentivizes clients to +// provide accurate deadlines for their calls. +// +// In order to improve concurrent throughput, this class uses a LIFO design: +// Each consumer thread has its own lock and condition variable. If a +// consumer arrives and there is no work available in the queue, it will not +// wait on the queue lock, but rather push its own 'ConsumerState' object +// to the 'waiting_consumers_' stack. When work arrives, if there are waiting +// consumers, the top consumer is popped from the stack and woken up. +// +// This design has a few advantages over the basic BlockingQueue: +// - the worker who was most recently busy is the one which will be selected for +// new work. This gives an opportunity for the worker to be scheduled again +// without going to sleep, and also keeps CPU cache and allocator caches hot. +// - in the common case that there are enough workers to fully service the incoming +// work rate, the queue implementation itself is never used. Thus, we can +// have a priority queue without paying extra for it in the common case. +// +// NOTE: because of the use of thread-local consumer records, once a consumer +// thread accesses one LifoServiceQueue, it becomes "bound" to that queue and +// must never access any other instance. +class LifoServiceQueue { + public: + explicit LifoServiceQueue(int max_size); + + ~LifoServiceQueue(); + + // Get an element from the queue. Returns false if we were shut down prior to + // getting the element. + bool BlockingGet(std::unique_ptr<InboundCall>* out); + + // Add a new call to the queue. + // Returns: + // - QUEUE_SHUTDOWN if Shutdown() has already been called. + // - QUEUE_FULL if the queue is full and 'call' has a later deadline than any + // RPC already in the queue. + // - QUEUE_SUCCESS if 'call' was enqueued. + // + // In the case of a 'QUEUE_SUCCESS' response, the new element may have bumped + // another call out of the queue. In that case, *evicted will be set to the + // call that was bumped. + QueueStatus Put(InboundCall* call, boost::optional<InboundCall*>* evicted); + + // Shut down the queue. + // When a blocking queue is shut down, no more elements can be added to it, + // and Put() will return QUEUE_SHUTDOWN. + // Existing elements will drain out of it, and then BlockingGet will start + // returning false. + void Shutdown(); + + bool empty() const; + + int max_size() const; + + std::string ToString() const; + + // Return an estimate of the current queue length. + int estimated_queue_length() const { + ANNOTATE_IGNORE_READS_BEGIN(); + // The C++ standard says that std::multiset::size must be constant time, + // so this method won't try to traverse any actual nodes of the underlying + // RB tree. Investigation of the libstdcxx implementation confirms that + // size() is a simple field access of the _Rb_tree structure. + int ret = queue_.size(); + ANNOTATE_IGNORE_READS_END(); + return ret; + } + + // Return an estimate of the number of idle threads currently awaiting work. + int estimated_idle_worker_count() const { + ANNOTATE_IGNORE_READS_BEGIN(); + // Size of a vector is a simple field access so this is safe. + int ret = waiting_consumers_.size(); + ANNOTATE_IGNORE_READS_END(); + return ret; + } + + private: + // Comparison function which orders calls by their deadlines. + static bool DeadlineLess(const InboundCall* a, + const InboundCall* b) { + auto time_a = a->GetClientDeadline(); + auto time_b = b->GetClientDeadline(); + if (time_a == time_b) { + // If two calls have the same deadline (most likely because neither one specified + // one) then we should order them by arrival order. + time_a = a->GetTimeReceived(); + time_b = b->GetTimeReceived(); + } + return time_a < time_b; + } + + // Struct functor wrapper for DeadlineLess. + struct DeadlineLessStruct { + bool operator()(const InboundCall* a, const InboundCall* b) const { + return DeadlineLess(a, b); + } + }; + + // The thread-local record corresponding to a single consumer thread. + // Threads push this record onto the waiting_consumers_ stack when + // they are awaiting work. Producers pop the top waiting consumer and + // post work using Post(). + class ConsumerState { + public: + explicit ConsumerState(LifoServiceQueue* queue) : + cond_(&lock_), + call_(nullptr), + should_wake_(false), + bound_queue_(queue) { + } + + void Post(InboundCall* call) { + DCHECK(call_ == nullptr); + MutexLock l(lock_); + call_ = call; + should_wake_ = true; + cond_.Signal(); + } + + InboundCall* Wait() { + MutexLock l(lock_); + while (should_wake_ == false) { + cond_.Wait(); + } + should_wake_ = false; + InboundCall* ret = call_; + call_ = nullptr; + return ret; + } + + void DCheckBoundInstance(LifoServiceQueue* q) { + DCHECK_EQ(q, bound_queue_); + } + + private: + Mutex lock_; + ConditionVariable cond_; + InboundCall* call_; + bool should_wake_; + + // For the purpose of assertions, tracks the LifoServiceQueue instance that + // this consumer is reading from. + LifoServiceQueue* bound_queue_; + }; + + static __thread ConsumerState* tl_consumer_; + + mutable simple_spinlock lock_; + bool shutdown_; + int max_queue_size_; + + // Stack of consumer threads which are currently waiting for work. + std::vector<ConsumerState*> waiting_consumers_; + + // The actual queue. Work is only added to the queue when there were no + // consumers available for a "direct hand-off". + std::multiset<InboundCall*, DeadlineLessStruct> queue_; + + // The total set of consumers who have ever accessed this queue. + std::vector<std::unique_ptr<ConsumerState>> consumers_; + + DISALLOW_COPY_AND_ASSIGN(LifoServiceQueue); +}; + +} // namespace rpc +} // namespace kudu + +#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/transfer.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc new file mode 100644 index 0000000..bdf5191 --- /dev/null +++ b/be/src/kudu/rpc/transfer.cc @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/transfer.h" + +#include <sys/uio.h> + +#include <algorithm> +#include <cstdint> +#include <iostream> +#include <limits> +#include <set> + +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/endian.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/constants.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/logging.h" +#include "kudu/util/net/socket.h" + +DEFINE_int64(rpc_max_message_size, (50 * 1024 * 1024), + "The maximum size of a message that any RPC that the server will accept. " + "Must be at least 1MB."); +TAG_FLAG(rpc_max_message_size, advanced); +TAG_FLAG(rpc_max_message_size, runtime); + +static bool ValidateMaxMessageSize(const char* flagname, int64_t value) { + if (value < 1 * 1024 * 1024) { + LOG(ERROR) << flagname << " must be at least 1MB."; + return false; + } + if (value > std::numeric_limits<int32_t>::max()) { + LOG(ERROR) << flagname << " must be less than " + << std::numeric_limits<int32_t>::max() << " bytes."; + } + + return true; +} +static bool dummy = google::RegisterFlagValidator( + &FLAGS_rpc_max_message_size, &ValidateMaxMessageSize); + +namespace kudu { +namespace rpc { + +using std::ostringstream; +using std::set; +using std::string; +using strings::Substitute; + +#define RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status) \ + do { \ + Status _s = (status); \ + if (PREDICT_FALSE(!_s.ok())) { \ + if (Socket::IsTemporarySocketError(_s.posix_code())) { \ + return Status::OK(); /* EAGAIN, etc. */ \ + } \ + return _s; \ + } \ + } while (0) + +TransferCallbacks::~TransferCallbacks() +{} + +InboundTransfer::InboundTransfer() + : total_length_(kMsgLengthPrefixLength), + cur_offset_(0) { + buf_.resize(kMsgLengthPrefixLength); +} + +Status InboundTransfer::ReceiveBuffer(Socket &socket) { + if (cur_offset_ < kMsgLengthPrefixLength) { + // receive uint32 length prefix + int32_t rem = kMsgLengthPrefixLength - cur_offset_; + int32_t nread; + Status status = socket.Recv(&buf_[cur_offset_], rem, &nread); + RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); + if (nread == 0) { + return Status::OK(); + } + DCHECK_GE(nread, 0); + cur_offset_ += nread; + if (cur_offset_ < kMsgLengthPrefixLength) { + // If we still don't have the full length prefix, we can't continue + // reading yet. + return Status::OK(); + } + // Since we only read 'rem' bytes above, we should now have exactly + // the length prefix in our buffer and no more. + DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength); + + // The length prefix doesn't include its own 4 bytes, so we have to + // add that back in. + total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength; + if (total_length_ > FLAGS_rpc_max_message_size) { + return Status::NetworkError(Substitute( + "RPC frame had a length of $0, but we only support messages up to $1 bytes " + "long.", total_length_, FLAGS_rpc_max_message_size)); + } + if (total_length_ <= kMsgLengthPrefixLength) { + return Status::NetworkError(Substitute("RPC frame had invalid length of $0", + total_length_)); + } + buf_.resize(total_length_); + + // Fall through to receive the message body, which is likely to be already + // available on the socket. + } + + // receive message body + int32_t nread; + + // Socket::Recv() handles at most INT_MAX at a time, so cap the remainder at + // INT_MAX. The message will be split across multiple Recv() calls. + // Note that this is only needed when rpc_max_message_size > INT_MAX, which is + // currently only used for unit tests. + int32_t rem = std::min(total_length_ - cur_offset_, + static_cast<uint32_t>(std::numeric_limits<int32_t>::max())); + Status status = socket.Recv(&buf_[cur_offset_], rem, &nread); + RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); + cur_offset_ += nread; + + return Status::OK(); +} + +bool InboundTransfer::TransferStarted() const { + return cur_offset_ != 0; +} + +bool InboundTransfer::TransferFinished() const { + return cur_offset_ == total_length_; +} + +string InboundTransfer::StatusAsString() const { + return Substitute("$0/$1 bytes received", cur_offset_, total_length_); +} + +OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id, + const TransferPayload &payload, + size_t n_payload_slices, + TransferCallbacks *callbacks) { + return new OutboundTransfer(call_id, payload, n_payload_slices, callbacks); +} + +OutboundTransfer* OutboundTransfer::CreateForCallResponse(const TransferPayload &payload, + size_t n_payload_slices, + TransferCallbacks *callbacks) { + return new OutboundTransfer(kInvalidCallId, payload, n_payload_slices, callbacks); +} + +OutboundTransfer::OutboundTransfer(int32_t call_id, + const TransferPayload &payload, + size_t n_payload_slices, + TransferCallbacks *callbacks) + : cur_slice_idx_(0), + cur_offset_in_slice_(0), + callbacks_(callbacks), + call_id_(call_id), + started_(false), + aborted_(false) { + + n_payload_slices_ = n_payload_slices; + CHECK_LE(n_payload_slices_, payload_slices_.size()); + for (int i = 0; i < n_payload_slices; i++) { + payload_slices_[i] = payload[i]; + } +} + +OutboundTransfer::~OutboundTransfer() { + if (!TransferFinished() && !aborted_) { + callbacks_->NotifyTransferAborted( + Status::RuntimeError("RPC transfer destroyed before it finished sending")); + } +} + +void OutboundTransfer::Abort(const Status &status) { + CHECK(!aborted_) << "Already aborted"; + CHECK(!TransferFinished()) << "Cannot abort a finished transfer"; + callbacks_->NotifyTransferAborted(status); + aborted_ = true; +} + +Status OutboundTransfer::SendBuffer(Socket &socket) { + CHECK_LT(cur_slice_idx_, n_payload_slices_); + + started_ = true; + int n_iovecs = n_payload_slices_ - cur_slice_idx_; + struct iovec iovec[n_iovecs]; + { + int offset_in_slice = cur_offset_in_slice_; + for (int i = 0; i < n_iovecs; i++) { + Slice &slice = payload_slices_[cur_slice_idx_ + i]; + iovec[i].iov_base = slice.mutable_data() + offset_in_slice; + iovec[i].iov_len = slice.size() - offset_in_slice; + + offset_in_slice = 0; + } + } + + int64_t written; + Status status = socket.Writev(iovec, n_iovecs, &written); + RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); + + // Adjust our accounting of current writer position. + for (int i = cur_slice_idx_; i < n_payload_slices_; i++) { + Slice &slice = payload_slices_[i]; + int rem_in_slice = slice.size() - cur_offset_in_slice_; + DCHECK_GE(rem_in_slice, 0); + + if (written >= rem_in_slice) { + // Used up this entire slice, advance to the next slice. + cur_slice_idx_++; + cur_offset_in_slice_ = 0; + written -= rem_in_slice; + } else { + // Partially used up this slice, just advance the offset within it. + cur_offset_in_slice_ += written; + break; + } + } + + if (cur_slice_idx_ == n_payload_slices_) { + callbacks_->NotifyTransferFinished(); + DCHECK_EQ(0, cur_offset_in_slice_); + } else { + DCHECK_LT(cur_slice_idx_, n_payload_slices_); + DCHECK_LT(cur_offset_in_slice_, payload_slices_[cur_slice_idx_].size()); + } + + return Status::OK(); +} + +bool OutboundTransfer::TransferStarted() const { + return started_; +} + +bool OutboundTransfer::TransferFinished() const { + if (cur_slice_idx_ == n_payload_slices_) { + DCHECK_EQ(0, cur_offset_in_slice_); // sanity check + return true; + } + return false; +} + +string OutboundTransfer::HexDump() const { + if (KUDU_SHOULD_REDACT()) { + return kRedactionMessage; + } + + string ret; + for (int i = 0; i < n_payload_slices_; i++) { + ret.append(payload_slices_[i].ToDebugString()); + } + return ret; +} + +int32_t OutboundTransfer::TotalLength() const { + int32_t ret = 0; + for (int i = 0; i < n_payload_slices_; i++) { + ret += payload_slices_[i].size(); + } + return ret; +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/transfer.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h new file mode 100644 index 0000000..b95d43d --- /dev/null +++ b/be/src/kudu/rpc/transfer.h @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_RPC_TRANSFER_H +#define KUDU_RPC_TRANSFER_H + +#include <array> +#include <cstddef> +#include <cstdint> +#include <limits.h> +#include <string> + +#include <boost/intrusive/list_hook.hpp> +#include <gflags/gflags_declare.h> +#include <glog/logging.h> + +#include "kudu/gutil/macros.h" +#include "kudu/rpc/constants.h" +#include "kudu/util/faststring.h" +#include "kudu/util/slice.h" +#include "kudu/util/status.h" + +DECLARE_int64(rpc_max_message_size); + +namespace kudu { + +class Socket; + +namespace rpc { + +struct TransferCallbacks; + +class TransferLimits { + public: + enum { + kMaxSidecars = 10, + kMaxPayloadSlices = kMaxSidecars + 2, // (header + msg) + kMaxTotalSidecarBytes = INT_MAX + }; + + DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits); +}; + +typedef std::array<Slice, TransferLimits::kMaxPayloadSlices> TransferPayload; + +// This class is used internally by the RPC layer to represent an inbound +// transfer in progress. +// +// Inbound Transfer objects are created by a Connection receiving data. When the +// message is fully received, it is either parsed as a call, or a call response, +// and the InboundTransfer object itself is handed off. +class InboundTransfer { + public: + + InboundTransfer(); + + // read from the socket into our buffer + Status ReceiveBuffer(Socket &socket); + + // Return true if any bytes have yet been sent. + bool TransferStarted() const; + + // Return true if the entire transfer has been sent. + bool TransferFinished() const; + + Slice data() const { + return Slice(buf_); + } + + // Return a string indicating the status of this transfer (number of bytes received, etc) + // suitable for logging. + std::string StatusAsString() const; + + private: + + Status ProcessInboundHeader(); + + faststring buf_; + + uint32_t total_length_; + uint32_t cur_offset_; + + DISALLOW_COPY_AND_ASSIGN(InboundTransfer); +}; + +// When the connection wants to send data, it creates an OutboundTransfer object +// to encompass it. This sits on a queue within the Connection, so that each time +// the Connection wakes up with a writable socket, it consumes more bytes off +// the next pending transfer in the queue. +// +// Upon completion of the transfer, a callback is triggered. +class OutboundTransfer : public boost::intrusive::list_base_hook<> { + public: + // Factory methods for creating transfers associated with call requests + // or responses. The 'payload' slices will be concatenated and + // written to the socket. When the transfer completes or errors, the + // appropriate method of 'callbacks' is invoked. + // + // Does not take ownership of the callbacks object or the underlying + // memory of the slices. The slices must remain valid until the callback + // is triggered. + // + // NOTE: 'payload' is currently restricted to a maximum of kMaxPayloadSlices + // slices. + // ------------------------------------------------------------ + + // Create an outbound transfer for a call request. + static OutboundTransfer* CreateForCallRequest(int32_t call_id, + const TransferPayload &payload, + size_t n_payload_slices, + TransferCallbacks *callbacks); + + // Create an outbound transfer for a call response. + // See above for details. + static OutboundTransfer* CreateForCallResponse(const TransferPayload &payload, + size_t n_payload_slices, + TransferCallbacks *callbacks); + + // Destruct the transfer. A transfer object should never be deallocated + // before it has either (a) finished transferring, or (b) been Abort()ed. + ~OutboundTransfer(); + + // Abort the current transfer, with the given status. + // This triggers TransferCallbacks::NotifyTransferAborted. + void Abort(const Status &status); + + // send from our buffers into the sock + Status SendBuffer(Socket &socket); + + // Return true if any bytes have yet been sent. + bool TransferStarted() const; + + // Return true if the entire transfer has been sent. + bool TransferFinished() const; + + // Return the total number of bytes to be sent (including those already sent) + int32_t TotalLength() const; + + std::string HexDump() const; + + bool is_for_outbound_call() const { + return call_id_ != kInvalidCallId; + } + + // Returns the call ID for a transfer associated with an outbound + // call. Must not be called for call responses. + int32_t call_id() const { + DCHECK_NE(call_id_, kInvalidCallId); + return call_id_; + } + + private: + OutboundTransfer(int32_t call_id, + const TransferPayload& payload, + size_t n_payload_slices, + TransferCallbacks *callbacks); + + // Slices to send. Uses an array here instead of a vector to avoid an expensive + // vector construction (improved performance a couple percent). + TransferPayload payload_slices_; + size_t n_payload_slices_; + + // The current slice that is being sent. + int32_t cur_slice_idx_; + // The number of bytes in the above slice which has already been sent. + int32_t cur_offset_in_slice_; + + TransferCallbacks *callbacks_; + + // In the case of outbound calls, the associated call ID. + // In the case of call responses, kInvalidCallId + int32_t call_id_; + + // True if SendBuffer() has been called at least once. This can be true even if + // no bytes were sent successfully. This is needed as SSL_write() is stateful. + // Please see KUDU-2334 for details. + bool started_; + + bool aborted_; + + DISALLOW_COPY_AND_ASSIGN(OutboundTransfer); +}; + +// Callbacks made after a transfer completes. +struct TransferCallbacks { + public: + virtual ~TransferCallbacks(); + + // The transfer finished successfully. + virtual void NotifyTransferFinished() = 0; + + // The transfer was aborted (e.g because the connection died or an error occurred). + virtual void NotifyTransferAborted(const Status &status) = 0; +}; + +} // namespace rpc +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/user_credentials.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/user_credentials.cc b/be/src/kudu/rpc/user_credentials.cc new file mode 100644 index 0000000..7f318fe --- /dev/null +++ b/be/src/kudu/rpc/user_credentials.cc @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/user_credentials.h" + +#include <cstddef> +#include <string> +#include <utility> + +#include <boost/functional/hash/hash.hpp> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/status.h" +#include "kudu/util/user.h" + +using std::string; + +namespace kudu { +namespace rpc { + +bool UserCredentials::has_real_user() const { + return !real_user_.empty(); +} + +void UserCredentials::set_real_user(string real_user) { + real_user_ = std::move(real_user); +} + +Status UserCredentials::SetLoggedInRealUser() { + return GetLoggedInUser(&real_user_); +} + +string UserCredentials::ToString() const { + return strings::Substitute("{real_user=$0}", real_user_); +} + +size_t UserCredentials::HashCode() const { + size_t seed = 0; + if (has_real_user()) { + boost::hash_combine(seed, real_user()); + } + return seed; +} + +bool UserCredentials::Equals(const UserCredentials& other) const { + return real_user() == other.real_user(); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/user_credentials.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/user_credentials.h b/be/src/kudu/rpc/user_credentials.h new file mode 100644 index 0000000..5a0434c --- /dev/null +++ b/be/src/kudu/rpc/user_credentials.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <cstddef> +#include <string> + +#include "kudu/util/status.h" + +namespace kudu { +namespace rpc { + +// Client-side user credentials. Currently this is more-or-less a simple wrapper +// around a username string. However, we anticipate moving more credentials such as +// tokens into a per-Proxy structure rather than Messenger-wide, and this will +// be the place to store them. +class UserCredentials { + public: + // Real user. + bool has_real_user() const; + void set_real_user(std::string real_user); + const std::string& real_user() const { return real_user_; } + + // Sets the real user to the currently logged in user. + Status SetLoggedInRealUser(); + + // Returns a string representation of the object. + std::string ToString() const; + + std::size_t HashCode() const; + bool Equals(const UserCredentials& other) const; + + private: + // Remember to update HashCode() and Equals() when new fields are added. + std::string real_user_; +}; + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/kudu/security/CMakeLists.txt b/be/src/kudu/security/CMakeLists.txt new file mode 100644 index 0000000..b79486e --- /dev/null +++ b/be/src/kudu/security/CMakeLists.txt @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# See the comment in krb5_realm_override.cc for details on this library's usage. +# The top-level CMakeLists sets a ${KRB5_REALM_OVERRIDE} variable which should +# be linked first into all Kudu binaries. + +############################## +# krb5_realm_override +############################## + +add_library(krb5_realm_override STATIC krb5_realm_override.cc) +target_link_libraries(krb5_realm_override glog) +if(NOT APPLE) + target_link_libraries(krb5_realm_override dl) +endif() + +############################## +# token_proto +############################## + +PROTOBUF_GENERATE_CPP( + TOKEN_PROTO_SRCS TOKEN_PROTO_HDRS TOKEN_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES token.proto) +set(TOKEN_PROTO_LIBS protobuf pb_util_proto) +ADD_EXPORTABLE_LIBRARY(token_proto + SRCS ${TOKEN_PROTO_SRCS} + DEPS ${TOKEN_PROTO_LIBS} + NONLINK_DEPS ${TOKEN_PROTO_TGTS}) + + +############################## +# security +############################## + +# Check for krb5_get_init_creds_opt_set_out_ccache, which is not available in versions +# of MIT Kerberos older than krb5-1.6, and is also not present in Heimdal kerberos. +include(CheckLibraryExists) +check_library_exists("krb5" krb5_get_init_creds_opt_set_out_ccache + ${KERBEROS_LIBRARY} HAVE_KRB5_GET_INIT_CREDS_OPT_SET_OUT_CCACHE) +if(HAVE_KRB5_GET_INIT_CREDS_OPT_SET_OUT_CCACHE) + add_definitions(-DHAVE_KRB5_GET_INIT_CREDS_OPT_SET_OUT_CCACHE=1) +endif() + +# Fall back to using the ported functionality if we're using an older version of OpenSSL. +if (${OPENSSL_VERSION} VERSION_LESS "1.0.2") + set(PORTED_X509_CHECK_HOST_CC "x509_check_host.cc") +endif() + +set(SECURITY_SRCS + ca/cert_management.cc + cert.cc + crypto.cc + kerberos_util.cc + init.cc + openssl_util.cc + ${PORTED_X509_CHECK_HOST_CC} + security_flags.cc + simple_acl.cc + tls_context.cc + tls_handshake.cc + tls_socket.cc + token_verifier.cc + token_signer.cc + token_signing_key.cc + ) + +set(SECURITY_LIBS + gutil + kudu_util + token_proto + + krb5 + openssl_crypto + openssl_ssl) + +ADD_EXPORTABLE_LIBRARY(security + SRCS ${SECURITY_SRCS} + DEPS ${SECURITY_LIBS}) + + +############################## +# mini_kdc +############################## + +set(MINI_KDC_SRCS test/mini_kdc.cc) + +add_library(mini_kdc ${MINI_KDC_SRCS}) +target_link_libraries(mini_kdc + gutil + kudu_test_util + kudu_util) + +############################## +# security_test_util +############################## + +if (NOT NO_TESTS) + set(SECURITY_TEST_SRCS + security-test-util.cc + test/test_certs.cc + test/test_pass.cc) + + add_library(security_test_util ${SECURITY_TEST_SRCS}) + target_link_libraries(security_test_util + gutil + kudu_test_util + kudu_util + security) + + # Tests + set(KUDU_TEST_LINK_LIBS + mini_kdc + security + security_test_util + ${KUDU_MIN_TEST_LIBS}) + + ADD_KUDU_TEST(ca/cert_management-test) + ADD_KUDU_TEST(cert-test) + ADD_KUDU_TEST(crypto-test) + ADD_KUDU_TEST(test/mini_kdc-test) + ADD_KUDU_TEST(tls_handshake-test) + ADD_KUDU_TEST(tls_socket-test PROCESSORS 2) + ADD_KUDU_TEST(token-test) +endif() http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/ca/cert_management-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/security/ca/cert_management-test.cc b/be/src/kudu/security/ca/cert_management-test.cc new file mode 100644 index 0000000..0c8abc8 --- /dev/null +++ b/be/src/kudu/security/ca/cert_management-test.cc @@ -0,0 +1,294 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/security/ca/cert_management.h" + +#include <string> +#include <utility> +#include <vector> + +#include <boost/optional/optional.hpp> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/security/cert.h" +#include "kudu/security/crypto.h" +#include "kudu/security/openssl_util.h" +#include "kudu/security/security-test-util.h" +#include "kudu/security/test/test_certs.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using std::string; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace security { +namespace ca { + +class CertManagementTest : public KuduTest { + public: + void SetUp() override { + ASSERT_OK(ca_cert_.FromString(kCaCert, DataFormat::PEM)); + ASSERT_OK(ca_private_key_.FromString(kCaPrivateKey, DataFormat::PEM)); + ASSERT_OK(ca_public_key_.FromString(kCaPublicKey, DataFormat::PEM)); + ASSERT_OK(ca_exp_cert_.FromString(kCaExpiredCert, DataFormat::PEM)); + ASSERT_OK(ca_exp_private_key_.FromString(kCaExpiredPrivateKey, DataFormat::PEM)); + // Sanity checks. + ASSERT_OK(ca_cert_.CheckKeyMatch(ca_private_key_)); + ASSERT_OK(ca_exp_cert_.CheckKeyMatch(ca_exp_private_key_)); + } + + protected: + CertRequestGenerator::Config PrepareConfig( + const string& hostname = "localhost.localdomain") { + return { hostname }; + } + + CaCertRequestGenerator::Config PrepareCaConfig(const string& cn) { + return { cn }; + } + + // Create a new private key in 'key' and return a CSR associated with that + // key. + template<class CSRGen = CertRequestGenerator> + CertSignRequest PrepareTestCSR(typename CSRGen::Config config, + PrivateKey* key) { + CHECK_OK(GeneratePrivateKey(512, key)); + CSRGen gen(std::move(config)); + CHECK_OK(gen.Init()); + CertSignRequest req; + CHECK_OK(gen.GenerateRequest(*key, &req)); + return req; + } + + Cert ca_cert_; + PrivateKey ca_private_key_; + PublicKey ca_public_key_; + + Cert ca_exp_cert_; + PrivateKey ca_exp_private_key_; +}; + +// Check for basic constraints while initializing CertRequestGenerator objects. +TEST_F(CertManagementTest, RequestGeneratorConstraints) { + const CertRequestGenerator::Config gen_config = PrepareConfig(""); + CertRequestGenerator gen(gen_config); + const Status s = gen.Init(); + const string err_msg = s.ToString(); + ASSERT_TRUE(s.IsInvalidArgument()) << err_msg; + ASSERT_STR_CONTAINS(err_msg, "hostname must not be empty"); +} + +// Check for the basic functionality of the CertRequestGenerator class: +// check it's able to generate keys of expected number of bits and that it +// reports an error if trying to generate a key of unsupported number of bits. +TEST_F(CertManagementTest, RequestGeneratorBasics) { + const CertRequestGenerator::Config gen_config = PrepareConfig(); + + PrivateKey key; + ASSERT_OK(GeneratePrivateKey(1024, &key)); + CertRequestGenerator gen(gen_config); + ASSERT_OK(gen.Init()); + string key_str; + ASSERT_OK(key.ToString(&key_str, DataFormat::PEM)); + // Check for non-supported number of bits for the key. + Status s = GeneratePrivateKey(7, &key); + ASSERT_TRUE(s.IsRuntimeError()); +} + +// Check that CertSigner behaves in a predictable way if given non-matching +// CA private key and certificate. +TEST_F(CertManagementTest, SignerInitWithMismatchedCertAndKey) { + PrivateKey key; + const auto& csr = PrepareTestCSR(PrepareConfig(), &key); + { + Cert cert; + Status s = CertSigner(&ca_cert_, &ca_exp_private_key_) + .Sign(csr, &cert); + + const string err_msg = s.ToString(); + ASSERT_TRUE(s.IsRuntimeError()) << err_msg; + ASSERT_STR_CONTAINS(err_msg, "certificate does not match private key"); + } + { + Cert cert; + Status s = CertSigner(&ca_exp_cert_, &ca_private_key_) + .Sign(csr, &cert); + const string err_msg = s.ToString(); + ASSERT_TRUE(s.IsRuntimeError()) << err_msg; + ASSERT_STR_CONTAINS(err_msg, "certificate does not match private key"); + } +} + +// Check how CertSigner behaves if given expired CA certificate +// and corresponding private key. +TEST_F(CertManagementTest, SignerInitWithExpiredCert) { + const CertRequestGenerator::Config gen_config = PrepareConfig(); + PrivateKey key; + CertSignRequest req = PrepareTestCSR(gen_config, &key); + + // Signer works fine even with expired CA certificate. + Cert cert; + ASSERT_OK(CertSigner(&ca_exp_cert_, &ca_exp_private_key_).Sign(req, &cert)); + ASSERT_OK(cert.CheckKeyMatch(key)); +} + +// Generate X509 CSR and issue corresponding certificate putting the specified +// hostname into the SAN X509v3 extension field. The fix for KUDU-1981 addresses +// the issue of enabling Kudu server components on systems with FQDN longer than +// 64 characters. This test is a regression for KUDU-1981, so let's verify that +// CSRs and the result X509 cerificates with long hostnames in SAN are handled +// properly. +TEST_F(CertManagementTest, SignCertLongHostnameInSan) { + for (auto const& hostname : + { + "foo.bar.com", + + "222222222222222222222222222222222222222222222222222222222222222." + "555555555555555555555555555555555555555555555555555555555555555." + "555555555555555555555555555555555555555555555555555555555555555." + "chaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaars", + }) { + CertRequestGenerator::Config gen_config; + gen_config.hostname = hostname; + gen_config.user_id = "test-uid"; + PrivateKey key; + const auto& csr = PrepareTestCSR(gen_config, &key); + Cert cert; + ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_).Sign(csr, &cert)); + ASSERT_OK(cert.CheckKeyMatch(key)); + + EXPECT_EQ("C = US, ST = CA, O = MyCompany, CN = MyName, emailAddress = m...@email.com", + cert.IssuerName()); + EXPECT_EQ("UID = test-uid", cert.SubjectName()); + vector<string> hostnames = cert.Hostnames(); + ASSERT_EQ(1, hostnames.size()); + EXPECT_EQ(hostname, hostnames[0]); + } +} + +// Generate X509 CSR and issues corresponding certificate. +TEST_F(CertManagementTest, SignCert) { + CertRequestGenerator::Config gen_config; + gen_config.hostname = "foo.bar.com"; + gen_config.user_id = "test-uid"; + gen_config.kerberos_principal = "kudu/foo.bar....@bar.com"; + PrivateKey key; + const auto& csr = PrepareTestCSR(gen_config, &key); + Cert cert; + ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_).Sign(csr, &cert)); + ASSERT_OK(cert.CheckKeyMatch(key)); + + EXPECT_EQ("C = US, ST = CA, O = MyCompany, CN = MyName, emailAddress = m...@email.com", + cert.IssuerName()); + EXPECT_EQ("UID = test-uid", cert.SubjectName()); + EXPECT_EQ(gen_config.user_id, *cert.UserId()); + EXPECT_EQ(gen_config.kerberos_principal, *cert.KuduKerberosPrincipal()); + vector<string> hostnames = cert.Hostnames(); + ASSERT_EQ(1, hostnames.size()); + EXPECT_EQ("foo.bar.com", hostnames[0]); +} + +// Generate X509 CA CSR and sign the result certificate. +TEST_F(CertManagementTest, SignCaCert) { + const CaCertRequestGenerator::Config gen_config(PrepareCaConfig("self-ca")); + PrivateKey key; + const auto& csr = PrepareTestCSR<CaCertRequestGenerator>(gen_config, &key); + Cert cert; + ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_).Sign(csr, &cert)); + ASSERT_OK(cert.CheckKeyMatch(key)); +} + +// Test the creation and use of a CA which uses a self-signed CA cert +// generated on the fly. +TEST_F(CertManagementTest, TestSelfSignedCA) { + PrivateKey ca_key; + Cert ca_cert; + ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert)); + + // Create a key and CSR for the tablet server. + const auto& config = PrepareConfig(); + PrivateKey ts_key; + CertSignRequest ts_csr = PrepareTestCSR(config, &ts_key); + + // Sign it using the self-signed CA. + Cert ts_cert; + ASSERT_OK(CertSigner(&ca_cert, &ca_key).Sign(ts_csr, &ts_cert)); + ASSERT_OK(ts_cert.CheckKeyMatch(ts_key)); +} + +// Check the transformation chains for X509 CSRs: +// internal -> PEM -> internal -> PEM +// internal -> DER -> internal -> DER +TEST_F(CertManagementTest, X509CsrFromAndToString) { + static const DataFormat kFormats[] = { DataFormat::PEM, DataFormat::DER }; + + PrivateKey key; + ASSERT_OK(GeneratePrivateKey(1024, &key)); + CertRequestGenerator gen(PrepareConfig()); + ASSERT_OK(gen.Init()); + CertSignRequest req_ref; + ASSERT_OK(gen.GenerateRequest(key, &req_ref)); + + for (auto format : kFormats) { + SCOPED_TRACE(Substitute("X509 CSR format: $0", DataFormatToString(format))); + string str_req_ref; + ASSERT_OK(req_ref.ToString(&str_req_ref, format)); + CertSignRequest req; + ASSERT_OK(req.FromString(str_req_ref, format)); + string str_req; + ASSERT_OK(req.ToString(&str_req, format)); + ASSERT_EQ(str_req_ref, str_req); + } +} + +// Check the transformation chains for X509 certs: +// internal -> PEM -> internal -> PEM +// internal -> DER -> internal -> DER +TEST_F(CertManagementTest, X509FromAndToString) { + static const DataFormat kFormats[] = { DataFormat::PEM, DataFormat::DER }; + + PrivateKey key; + ASSERT_OK(GeneratePrivateKey(1024, &key)); + CertRequestGenerator gen(PrepareConfig()); + ASSERT_OK(gen.Init()); + CertSignRequest req; + ASSERT_OK(gen.GenerateRequest(key, &req)); + + Cert cert_ref; + ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_) + .Sign(req, &cert_ref)); + + for (auto format : kFormats) { + SCOPED_TRACE(Substitute("X509 format: $0", DataFormatToString(format))); + string str_cert_ref; + ASSERT_OK(cert_ref.ToString(&str_cert_ref, format)); + Cert cert; + ASSERT_OK(cert.FromString(str_cert_ref, format)); + string str_cert; + ASSERT_OK(cert.ToString(&str_cert, format)); + ASSERT_EQ(str_cert_ref, str_cert); + } +} + +} // namespace ca +} // namespace security +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/ca/cert_management.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/security/ca/cert_management.cc b/be/src/kudu/security/ca/cert_management.cc new file mode 100644 index 0000000..7ccc376 --- /dev/null +++ b/be/src/kudu/security/ca/cert_management.cc @@ -0,0 +1,423 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/security/ca/cert_management.h" + +#include <algorithm> +#include <cstdio> +#include <memory> +#include <mutex> +#include <string> + +#include <glog/logging.h> +#include <openssl/conf.h> +#ifndef OPENSSL_NO_ENGINE +#include <openssl/engine.h> +#endif +#include <openssl/pem.h> +#include <openssl/x509.h> +#include <openssl/x509v3.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/security/cert.h" +#include "kudu/security/crypto.h" +#include "kudu/security/openssl_util.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/status.h" + +using std::lock_guard; +using std::move; +using std::string; +using strings::Substitute; + +namespace kudu { +namespace security { + +template<> struct SslTypeTraits<ASN1_INTEGER> { + static constexpr auto kFreeFunc = &ASN1_INTEGER_free; +}; +template<> struct SslTypeTraits<BIGNUM> { + static constexpr auto kFreeFunc = &BN_free; +}; + +namespace ca { + +namespace { + +Status SetSubjectNameField(X509_NAME* name, + const char* field_code, + const string& field_value) { + CHECK(name); + CHECK(field_code); + OPENSSL_RET_NOT_OK(X509_NAME_add_entry_by_txt( + name, field_code, MBSTRING_ASC, + reinterpret_cast<const unsigned char*>(field_value.c_str()), -1, -1, 0), + Substitute("error setting subject field $0", field_code)); + return Status::OK(); +} + +} // anonymous namespace + +CertRequestGenerator::~CertRequestGenerator() { + sk_X509_EXTENSION_pop_free(extensions_, X509_EXTENSION_free); +} + +Status CertRequestGeneratorBase::GenerateRequest(const PrivateKey& key, + CertSignRequest* ret) const { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + CHECK(ret); + CHECK(Initialized()); + auto req = ssl_make_unique(X509_REQ_new()); + OPENSSL_RET_NOT_OK(X509_REQ_set_pubkey(req.get(), key.GetRawData()), + "error setting X509 public key"); + + // Populate the subject field of the request. + RETURN_NOT_OK(SetSubject(req.get())); + + // Set necessary extensions into the request. + RETURN_NOT_OK(SetExtensions(req.get())); + + // And finally sign the result. + OPENSSL_RET_NOT_OK(X509_REQ_sign(req.get(), key.GetRawData(), EVP_sha256()), + "error signing X509 request"); + ret->AdoptRawData(req.release()); + + return Status::OK(); +} + +Status CertRequestGeneratorBase::PushExtension(stack_st_X509_EXTENSION* st, + int32_t nid, StringPiece value) { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + auto ex = ssl_make_unique( + X509V3_EXT_conf_nid(nullptr, nullptr, nid, const_cast<char*>(value.data()))); + OPENSSL_RET_IF_NULL(ex, "error configuring extension"); + OPENSSL_RET_NOT_OK(sk_X509_EXTENSION_push(st, ex.release()), + "error pushing extension into the stack"); + return Status::OK(); +} + +CertRequestGenerator::CertRequestGenerator(Config config) + : CertRequestGeneratorBase(), + config_(std::move(config)) { +} + +Status CertRequestGenerator::Init() { + InitializeOpenSSL(); + SCOPED_OPENSSL_NO_PENDING_ERRORS; + + CHECK(!is_initialized_); + + // Build the SAN field using the specified hostname. In general, it might be + // multiple DNS hostnames in the field, but in our use-cases it's always one. + if (config_.hostname.empty()) { + return Status::InvalidArgument("hostname must not be empty"); + } + const string san_hosts = Substitute("DNS.0:$0", config_.hostname); + + extensions_ = sk_X509_EXTENSION_new_null(); + + // Permitted usages for the generated keys is set via X509 V3 + // standard/extended key usage attributes. + // See https://www.openssl.org/docs/man1.0.1/apps/x509v3_config.html + // for details. + + // The generated certificates are for using as TLS certificates for + // both client and server. + string usage = "critical,digitalSignature,keyEncipherment"; + if (for_self_signing_) { + // If we are generating a CSR for self-signing, then we need to + // add this keyUsage attribute. See https://s.apache.org/BFHk + usage += ",keyCertSign"; + } + + RETURN_NOT_OK(PushExtension(extensions_, NID_key_usage, usage)); + // The generated certificates should be good for authentication + // of a server to a client and vice versa: the intended users of the + // certificates are tablet servers which are going to talk to master + // and other tablet servers via TLS channels. + RETURN_NOT_OK(PushExtension(extensions_, NID_ext_key_usage, + "critical,serverAuth,clientAuth")); + + // The generated certificates are not intended to be used as CA certificates + // (i.e. they cannot be used to sign/issue certificates). + RETURN_NOT_OK(PushExtension(extensions_, NID_basic_constraints, + "critical,CA:FALSE")); + + if (config_.kerberos_principal) { + int nid = GetKuduKerberosPrincipalOidNid(); + RETURN_NOT_OK(PushExtension(extensions_, nid, + Substitute("ASN1:UTF8:$0", *config_.kerberos_principal))); + } + RETURN_NOT_OK(PushExtension(extensions_, NID_subject_alt_name, san_hosts)); + + is_initialized_ = true; + + return Status::OK(); +} + +bool CertRequestGenerator::Initialized() const { + return is_initialized_; +} + +Status CertRequestGenerator::SetSubject(X509_REQ* req) const { + if (config_.user_id) { + RETURN_NOT_OK(SetSubjectNameField(X509_REQ_get_subject_name(req), + "UID", *config_.user_id)); + } + return Status::OK(); +} + +Status CertRequestGenerator::SetExtensions(X509_REQ* req) const { + OPENSSL_RET_NOT_OK(X509_REQ_add_extensions(req, extensions_), + "error setting X509 request extensions"); + return Status::OK(); +} + +CaCertRequestGenerator::CaCertRequestGenerator(Config config) + : config_(std::move(config)), + extensions_(nullptr), + is_initialized_(false) { +} + +CaCertRequestGenerator::~CaCertRequestGenerator() { + sk_X509_EXTENSION_pop_free(extensions_, X509_EXTENSION_free); +} + +Status CaCertRequestGenerator::Init() { + InitializeOpenSSL(); + SCOPED_OPENSSL_NO_PENDING_ERRORS; + + lock_guard<simple_spinlock> guard(lock_); + if (is_initialized_) { + return Status::OK(); + } + if (config_.cn.empty()) { + return Status::InvalidArgument("missing CA service UUID/name"); + } + + extensions_ = sk_X509_EXTENSION_new_null(); + + // Permitted usages for the generated keys is set via X509 V3 + // standard/extended key usage attributes. + // See https://www.openssl.org/docs/man1.0.1/apps/x509v3_config.html + // for details. + + // The target ceritifcate is a CA certificate: it's for signing X509 certs. + RETURN_NOT_OK(PushExtension(extensions_, NID_key_usage, + "critical,keyCertSign")); + // The generated certificates are for the private CA service. + RETURN_NOT_OK(PushExtension(extensions_, NID_basic_constraints, + "critical,CA:TRUE")); + is_initialized_ = true; + + return Status::OK(); +} + +bool CaCertRequestGenerator::Initialized() const { + lock_guard<simple_spinlock> guard(lock_); + return is_initialized_; +} + +Status CaCertRequestGenerator::SetSubject(X509_REQ* req) const { + return SetSubjectNameField(X509_REQ_get_subject_name(req), "CN", config_.cn); +} + +Status CaCertRequestGenerator::SetExtensions(X509_REQ* req) const { + OPENSSL_RET_NOT_OK(X509_REQ_add_extensions(req, extensions_), + "error setting X509 request extensions"); + return Status::OK(); +} + +Status CertSigner::SelfSignCA(const PrivateKey& key, + CaCertRequestGenerator::Config config, + int64_t cert_expiration_seconds, + Cert* cert) { + // Generate a CSR for the CA. + CertSignRequest ca_csr; + { + CaCertRequestGenerator gen(std::move(config)); + RETURN_NOT_OK(gen.Init()); + RETURN_NOT_OK(gen.GenerateRequest(key, &ca_csr)); + } + + // Self-sign the CA's CSR. + return CertSigner(nullptr, &key) + .set_expiration_interval(MonoDelta::FromSeconds(cert_expiration_seconds)) + .Sign(ca_csr, cert); +} + +Status CertSigner::SelfSignCert(const PrivateKey& key, + CertRequestGenerator::Config config, + Cert* cert) { + // Generate a CSR. + CertSignRequest csr; + { + CertRequestGenerator gen(std::move(config)); + gen.enable_self_signing(); + RETURN_NOT_OK(gen.Init()); + RETURN_NOT_OK(gen.GenerateRequest(key, &csr)); + } + + // Self-sign the CSR with the key. + return CertSigner(nullptr, &key).Sign(csr, cert); +} + + +CertSigner::CertSigner(const Cert* ca_cert, + const PrivateKey* ca_private_key) + : ca_cert_(ca_cert), + ca_private_key_(ca_private_key) { + // Private key is required. + CHECK(ca_private_key_ && ca_private_key_->GetRawData()); + // The cert is optional, but if we have it, it should be initialized. + CHECK(!ca_cert_ || ca_cert_->GetRawData()); +} + +Status CertSigner::Sign(const CertSignRequest& req, Cert* ret) const { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + InitializeOpenSSL(); + CHECK(ret); + + // If we are not self-signing, then make sure that the provided CA + // cert and key match each other. Technically this would be programmer + // error since we're always using internally-generated CA certs, but + // this isn't a hot path so we'll keep the extra safety. + if (ca_cert_) { + RETURN_NOT_OK(ca_cert_->CheckKeyMatch(*ca_private_key_)); + } + auto x509 = ssl_make_unique(X509_new()); + RETURN_NOT_OK(FillCertTemplateFromRequest(req.GetRawData(), x509.get())); + RETURN_NOT_OK(DoSign(EVP_sha256(), exp_interval_sec_, x509.get())); + ret->AdoptX509(x509.release()); + + return Status::OK(); +} + +// This is modeled after code in copy_extensions() function from +// $OPENSSL_ROOT/apps/apps.c with OpenSSL 1.0.2. +Status CertSigner::CopyExtensions(X509_REQ* req, X509* x) { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + CHECK(req); + CHECK(x); + STACK_OF(X509_EXTENSION)* exts = X509_REQ_get_extensions(req); + SCOPED_CLEANUP({ + sk_X509_EXTENSION_pop_free(exts, X509_EXTENSION_free); + }); + for (size_t i = 0; i < sk_X509_EXTENSION_num(exts); ++i) { + X509_EXTENSION* ext = sk_X509_EXTENSION_value(exts, i); + ASN1_OBJECT* obj = X509_EXTENSION_get_object(ext); + int32_t idx = X509_get_ext_by_OBJ(x, obj, -1); + if (idx != -1) { + // If extension exits, delete all extensions of same type. + do { + auto tmpext = ssl_make_unique(X509_get_ext(x, idx)); + X509_delete_ext(x, idx); + idx = X509_get_ext_by_OBJ(x, obj, -1); + } while (idx != -1); + } + OPENSSL_RET_NOT_OK(X509_add_ext(x, ext, -1), "error adding extension"); + } + + return Status::OK(); +} + +Status CertSigner::FillCertTemplateFromRequest(X509_REQ* req, X509* tmpl) { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + CHECK(req); + + // As of OpenSSL 1.1, req's internals are hidden. +#if OPENSSL_VERSION_NUMBER < 0x10100000L + if (!req->req_info || + !req->req_info->pubkey || + !req->req_info->pubkey->public_key || + !req->req_info->pubkey->public_key->data) { + return Status::RuntimeError("corrupted CSR: no public key"); + } +#endif + auto pub_key = ssl_make_unique(X509_REQ_get_pubkey(req)); + OPENSSL_RET_IF_NULL(pub_key, "error unpacking public key from CSR"); + const int rc = X509_REQ_verify(req, pub_key.get()); + if (rc < 0) { + return Status::RuntimeError("CSR signature verification error", + GetOpenSSLErrors()); + } + if (rc == 0) { + return Status::RuntimeError("CSR signature mismatch", + GetOpenSSLErrors()); + } + OPENSSL_RET_NOT_OK(X509_set_subject_name(tmpl, X509_REQ_get_subject_name(req)), + "error setting cert subject name"); + RETURN_NOT_OK(CopyExtensions(req, tmpl)); + OPENSSL_RET_NOT_OK(X509_set_pubkey(tmpl, pub_key.get()), + "error setting cert public key"); + return Status::OK(); +} + +Status CertSigner::DigestSign(const EVP_MD* md, EVP_PKEY* pkey, X509* x) { + OPENSSL_RET_NOT_OK(X509_sign(x, pkey, md), "error signing certificate"); + return Status::OK(); +} + +Status CertSigner::GenerateSerial(c_unique_ptr<ASN1_INTEGER>* ret) { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + auto btmp = ssl_make_unique(BN_new()); + OPENSSL_RET_NOT_OK(BN_pseudo_rand(btmp.get(), 64, 0, 0), + "error generating random number"); + auto serial = ssl_make_unique(ASN1_INTEGER_new()); + OPENSSL_RET_IF_NULL(BN_to_ASN1_INTEGER(btmp.get(), serial.get()), + "error converting number into ASN1 representation"); + if (ret) { + ret->swap(serial); + } + return Status::OK(); +} + +Status CertSigner::DoSign(const EVP_MD* digest, int32_t exp_seconds, + X509* ret) const { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + CHECK(ret); + + // Version 3 (v3) of X509 certificates. The integer value is one less + // than the version it represents. This is not a typo. :) + static const int kX509V3 = 2; + + // If we have a CA cert, then the CA is the issuer. + // Otherwise, we are self-signing so the target cert is also the issuer. + X509* issuer_cert = ca_cert_ ? ca_cert_->GetTopOfChainX509() : ret; + X509_NAME* issuer_name = X509_get_subject_name(issuer_cert); + OPENSSL_RET_NOT_OK(X509_set_issuer_name(ret, issuer_name), + "error setting issuer name"); + c_unique_ptr<ASN1_INTEGER> serial; + RETURN_NOT_OK(GenerateSerial(&serial)); + // set version to v3 + OPENSSL_RET_NOT_OK(X509_set_version(ret, kX509V3), + "error setting cert version"); + OPENSSL_RET_NOT_OK(X509_set_serialNumber(ret, serial.get()), + "error setting cert serial"); + OPENSSL_RET_IF_NULL(X509_gmtime_adj(X509_get_notBefore(ret), 0L), + "error setting cert validity time"); + OPENSSL_RET_IF_NULL(X509_gmtime_adj(X509_get_notAfter(ret), exp_seconds), + "error setting cert expiration time"); + RETURN_NOT_OK(DigestSign(digest, ca_private_key_->GetRawData(), ret)); + + return Status::OK(); +} + +} // namespace ca +} // namespace security +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/ca/cert_management.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/security/ca/cert_management.h b/be/src/kudu/security/ca/cert_management.h new file mode 100644 index 0000000..fb2bd0e --- /dev/null +++ b/be/src/kudu/security/ca/cert_management.h @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> +#include <memory> +#include <string> + +#include <boost/optional/optional.hpp> +#include <glog/logging.h> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/stringpiece.h" +#include "kudu/security/openssl_util.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" + +// Forward declarations for the relevant OpenSSL typedefs +// in addition to openssl_util.h. +typedef struct asn1_string_st ASN1_INTEGER; +#if OPENSSL_VERSION_NUMBER < 0x10100000L +typedef struct env_md_st EVP_MD; +#else +typedef struct evp_md_st EVP_MD; +#endif +typedef struct rsa_st RSA; +typedef struct x509_st X509; +typedef struct X509_req_st X509_REQ; + +// STACK_OF(X509_EXTENSION) +struct stack_st_X509_EXTENSION; // IWYU pragma: keep + +namespace kudu { +namespace security { + +class Cert; +class CertSignRequest; +class PrivateKey; + +namespace ca { + +// Base utility class for issuing X509 CSRs. +class CertRequestGeneratorBase { + public: + CertRequestGeneratorBase() = default; + virtual ~CertRequestGeneratorBase() = default; + + virtual Status Init() = 0; + virtual bool Initialized() const = 0; + + // Generate X509 CSR using the specified key. To obtain the key, + // call the GeneratePrivateKey() function. + Status GenerateRequest(const PrivateKey& key, CertSignRequest* ret) const WARN_UNUSED_RESULT; + + protected: + // Push the specified extension into the stack provided. + static Status PushExtension(stack_st_X509_EXTENSION* st, + int32_t nid, + StringPiece value) WARN_UNUSED_RESULT; + + // Set the certificate-specific subject fields into the specified request. + virtual Status SetSubject(X509_REQ* req) const = 0; + + // Set the certificate-specific extensions into the specified request. + virtual Status SetExtensions(X509_REQ* req) const = 0; + + private: + DISALLOW_COPY_AND_ASSIGN(CertRequestGeneratorBase); +}; + +// An utility class that facilitates issuing certificate signing requests +// (a.k.a. X509 CSRs). +class CertRequestGenerator : public CertRequestGeneratorBase { + public: + // Properties for the generated X509 CSR. The 'hostname' is for the name of + // the machine the requestor is to use the certificate at. Valid configuration + // should contain non-empty 'hostname' field. + struct Config { + // FQDN name to put into the 'DNS' fields of the subjectAltName extension. + std::string hostname; + // userId (UID) + boost::optional<std::string> user_id; + // Our custom extension which stores the full Kerberos principal for IPKI certs. + boost::optional<std::string> kerberos_principal; + }; + + // 'config' contains the properties to fill into the X509 attributes of the + // CSR. + explicit CertRequestGenerator(Config config); + ~CertRequestGenerator(); + + Status Init() override WARN_UNUSED_RESULT; + bool Initialized() const override; + + CertRequestGenerator& enable_self_signing() { + CHECK(!is_initialized_); + for_self_signing_ = true; + return *this; + } + + protected: + Status SetSubject(X509_REQ* req) const override WARN_UNUSED_RESULT; + Status SetExtensions(X509_REQ* req) const override WARN_UNUSED_RESULT; + + private: + const Config config_; + stack_st_X509_EXTENSION* extensions_ = nullptr; + bool is_initialized_ = false; + bool for_self_signing_ = false; +}; + +// An utility class that facilitates issuing of root CA self-signed certificate +// signing requests. +class CaCertRequestGenerator : public CertRequestGeneratorBase { + public: + // Properties for the generated X509 CA CSR. + struct Config { + // Common name (CN); e.g. 'master 239D6D2F-BDD2-4463-8933-78D9559C2124'. + // Don't put hostname/FQDN in here: for CA cert it does not make sense and + // it might be longer than 64 characters which is the limit specified + // by RFC5280. The limit is enforced by the OpenSSL library. + std::string cn; + }; + + explicit CaCertRequestGenerator(Config config); + ~CaCertRequestGenerator(); + + Status Init() override WARN_UNUSED_RESULT; + bool Initialized() const override; + + protected: + Status SetSubject(X509_REQ* req) const override WARN_UNUSED_RESULT; + Status SetExtensions(X509_REQ* req) const override WARN_UNUSED_RESULT; + + private: + const Config config_; + stack_st_X509_EXTENSION* extensions_; + mutable simple_spinlock lock_; + bool is_initialized_; // protected by lock_ +}; + +// An utility class for issuing and signing certificates. +// +// This is used in "fluent" style. For example: +// +// CHECK_OK(CertSigner(&my_ca_cert, &my_ca_key) +// .set_expiration_interval(MonoDelta::FromSeconds(3600)) +// .Sign(csr, &cert)); +// +// As such, this class is not guaranteed thread-safe. +class CertSigner { + public: + // Generate a self-signed certificate authority using the given key + // and CSR configuration. + static Status SelfSignCA(const PrivateKey& key, + CaCertRequestGenerator::Config config, + int64_t cert_expiration_seconds, + Cert* cert) WARN_UNUSED_RESULT; + + // Generate a self-signed certificate using the given key and CSR + // configuration. + static Status SelfSignCert(const PrivateKey& key, + CertRequestGenerator::Config config, + Cert* cert) WARN_UNUSED_RESULT; + + // Create a CertSigner. + // + // The given cert and key must stay valid for the lifetime of the + // cert signer. See class documentation above for recommended usage. + // + // 'ca_cert' may be nullptr in order to perform self-signing (though + // the SelfSignCA() static method above is recommended). + CertSigner(const Cert* ca_cert, const PrivateKey* ca_private_key); + ~CertSigner() = default; + + // Set the expiration interval for certs signed by this signer. + // This may be changed at any point. + CertSigner& set_expiration_interval(MonoDelta expiration) { + exp_interval_sec_ = expiration.ToSeconds(); + return *this; + } + + Status Sign(const CertSignRequest& req, Cert* ret) const WARN_UNUSED_RESULT; + + private: + + static Status CopyExtensions(X509_REQ* req, X509* x) WARN_UNUSED_RESULT; + static Status FillCertTemplateFromRequest(X509_REQ* req, X509* tmpl) WARN_UNUSED_RESULT; + static Status DigestSign(const EVP_MD* md, EVP_PKEY* pkey, X509* x) WARN_UNUSED_RESULT; + static Status GenerateSerial(c_unique_ptr<ASN1_INTEGER>* ret) WARN_UNUSED_RESULT; + + Status DoSign(const EVP_MD* digest, int32_t exp_seconds, X509 *ret) const WARN_UNUSED_RESULT; + + // The expiration interval of certs signed by this signer. + int32_t exp_interval_sec_ = 24 * 60 * 60; + + // The CA cert. null if this CertSigner is configured for self-signing. + const Cert* const ca_cert_; + + // The CA private key. If configured for self-signing, this is the + // private key associated with the target cert. + const PrivateKey* const ca_private_key_; + + DISALLOW_COPY_AND_ASSIGN(CertSigner); +}; + +} // namespace ca +} // namespace security +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/cert-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/security/cert-test.cc b/be/src/kudu/security/cert-test.cc new file mode 100644 index 0000000..12205e1 --- /dev/null +++ b/be/src/kudu/security/cert-test.cc @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <string> +#include <thread> +#include <utility> +#include <vector> + +#include <boost/optional/optional.hpp> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/strings/strip.h" +#include "kudu/security/cert.h" +#include "kudu/security/crypto.h" +#include "kudu/security/openssl_util.h" +#include "kudu/security/test/test_certs.h" +#include "kudu/util/barrier.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using std::pair; +using std::string; +using std::thread; +using std::vector; + +namespace kudu { +namespace security { + +// Test for various certificate-related functionality in the security library. +// These do not cover CA certificate mananagement part; check +// cert_management-test.cc for those. +class CertTest : public KuduTest { + public: + void SetUp() override { + ASSERT_OK(ca_cert_.FromString(kCaCert, DataFormat::PEM)); + ASSERT_OK(ca_private_key_.FromString(kCaPrivateKey, DataFormat::PEM)); + ASSERT_OK(ca_public_key_.FromString(kCaPublicKey, DataFormat::PEM)); + ASSERT_OK(ca_exp_cert_.FromString(kCaExpiredCert, DataFormat::PEM)); + ASSERT_OK(ca_exp_private_key_.FromString(kCaExpiredPrivateKey, + DataFormat::PEM)); + // Sanity checks. + ASSERT_OK(ca_cert_.CheckKeyMatch(ca_private_key_)); + ASSERT_OK(ca_exp_cert_.CheckKeyMatch(ca_exp_private_key_)); + } + + protected: + Cert ca_cert_; + PrivateKey ca_private_key_; + PublicKey ca_public_key_; + + Cert ca_exp_cert_; + PrivateKey ca_exp_private_key_; +}; + +// Regression test to make sure that GetKuduKerberosPrincipalOidNid is thread +// safe. OpenSSL 1.0.0's OBJ_create method is not thread safe. +TEST_F(CertTest, GetKuduKerberosPrincipalOidNidConcurrent) { + int kConcurrency = 16; + Barrier barrier(kConcurrency); + + vector<thread> threads; + for (int i = 0; i < kConcurrency; i++) { + threads.emplace_back([&] () { + barrier.Wait(); + CHECK_NE(NID_undef, GetKuduKerberosPrincipalOidNid()); + }); + } + + for (auto& thread : threads) { + thread.join(); + } +} + +// Check input/output of the X509 certificates in PEM format. +TEST_F(CertTest, CertInputOutputPEM) { + const Cert& cert = ca_cert_; + string cert_str; + ASSERT_OK(cert.ToString(&cert_str, DataFormat::PEM)); + RemoveExtraWhitespace(&cert_str); + + string ca_input_cert(kCaCert); + RemoveExtraWhitespace(&ca_input_cert); + EXPECT_EQ(ca_input_cert, cert_str); +} + +// Check that Cert behaves in a predictable way if given invalid PEM data. +TEST_F(CertTest, CertInvalidInput) { + // Providing files which guaranteed to exists, but do not contain valid data. + // This is to make sure the init handles that situation correctly and + // does not choke on the wrong input data. + Cert c; + ASSERT_FALSE(c.FromFile("/bin/sh", DataFormat::PEM).ok()); +} + +// Check X509 certificate/private key matching: match cases. +TEST_F(CertTest, CertMatchesRsaPrivateKey) { + const pair<const Cert*, const PrivateKey*> cases[] = { + { &ca_cert_, &ca_private_key_ }, + { &ca_exp_cert_, &ca_exp_private_key_ }, + }; + for (const auto& e : cases) { + EXPECT_OK(e.first->CheckKeyMatch(*e.second)); + } +} + +// Check X509 certificate/private key matching: mismatch cases. +TEST_F(CertTest, CertMismatchesRsaPrivateKey) { + const pair<const Cert*, const PrivateKey*> cases[] = { + { &ca_cert_, &ca_exp_private_key_ }, + { &ca_exp_cert_, &ca_private_key_ }, + }; + for (const auto& e : cases) { + const Status s = e.first->CheckKeyMatch(*e.second); + EXPECT_TRUE(s.IsRuntimeError()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "certificate does not match private key"); + } +} + +TEST_F(CertTest, TestGetKuduSpecificFieldsWhenMissing) { + EXPECT_EQ(boost::none, ca_cert_.UserId()); + EXPECT_EQ(boost::none, ca_cert_.KuduKerberosPrincipal()); +} + +TEST_F(CertTest, DnsHostnameInSanField) { + const string hostname_foo_bar = "foo.bar.com"; + const string hostname_mega_giga = "mega.giga.io"; + const string hostname_too_long = + "toooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo." + "looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo" + "ng.hostname.io"; + + Cert cert; + ASSERT_OK(cert.FromString(kCertDnsHostnamesInSan, DataFormat::PEM)); + + EXPECT_EQ("C = US, ST = CA, O = MyCompany, CN = MyName, emailAddress = m...@email.com", + cert.IssuerName()); + vector<string> hostnames = cert.Hostnames(); + ASSERT_EQ(3, hostnames.size()); + EXPECT_EQ(hostname_mega_giga, hostnames[0]); + EXPECT_EQ(hostname_foo_bar, hostnames[1]); + EXPECT_EQ(hostname_too_long, hostnames[2]); +} + +} // namespace security +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/cert.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/security/cert.cc b/be/src/kudu/security/cert.cc new file mode 100644 index 0000000..b81d263 --- /dev/null +++ b/be/src/kudu/security/cert.cc @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/security/cert.h" + +#include <memory> +#include <mutex> +#include <ostream> +#include <string> + +#include <boost/optional/optional.hpp> +#include <glog/logging.h> +#include <openssl/evp.h> +#include <openssl/x509.h> +#include <openssl/x509v3.h> + +#include "kudu/gutil/macros.h" +#include "kudu/security/crypto.h" +#include "kudu/security/openssl_util.h" +#include "kudu/security/openssl_util_bio.h" +#include "kudu/util/status.h" + +using std::string; +using std::vector; + +namespace kudu { +namespace security { + +template<> struct SslTypeTraits<GENERAL_NAMES> { + static constexpr auto kFreeFunc = &GENERAL_NAMES_free; +}; + +// This OID is generated via the UUID method. +static const char* kKuduKerberosPrincipalOidStr = "2.25.243346677289068076843480765133256509912"; + +string X509NameToString(X509_NAME* name) { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + CHECK(name); + auto bio = ssl_make_unique(BIO_new(BIO_s_mem())); + OPENSSL_CHECK_OK(X509_NAME_print_ex(bio.get(), name, 0, XN_FLAG_ONELINE)); + + BUF_MEM* membuf; + OPENSSL_CHECK_OK(BIO_get_mem_ptr(bio.get(), &membuf)); + return string(membuf->data, membuf->length); +} + +int GetKuduKerberosPrincipalOidNid() { + InitializeOpenSSL(); + static std::once_flag flag; + static int nid; + std::call_once(flag, [&] () { + nid = OBJ_create(kKuduKerberosPrincipalOidStr, "kuduPrinc", "kuduKerberosPrincipal"); + CHECK_NE(nid, NID_undef) << "failed to create kuduPrinc oid: " << GetOpenSSLErrors(); + }); + return nid; +} + +X509* Cert::GetTopOfChainX509() const { + CHECK_GT(chain_len(), 0); + return sk_X509_value(data_.get(), 0); +} + +Status Cert::FromString(const std::string& data, DataFormat format) { + RETURN_NOT_OK(::kudu::security::FromString(data, format, &data_)); + if (sk_X509_num(data_.get()) < 1) { + return Status::RuntimeError("Certificate chain is empty. Expected at least one certificate."); + } + return Status::OK(); +} + +Status Cert::ToString(std::string* data, DataFormat format) const { + return ::kudu::security::ToString(data, format, data_.get()); +} + +Status Cert::FromFile(const std::string& fpath, DataFormat format) { + RETURN_NOT_OK(::kudu::security::FromFile(fpath, format, &data_)); + if (sk_X509_num(data_.get()) < 1) { + return Status::RuntimeError("Certificate chain is empty. Expected at least one certificate."); + } + return Status::OK(); +} + +string Cert::SubjectName() const { + return X509NameToString(X509_get_subject_name(GetTopOfChainX509())); +} + +string Cert::IssuerName() const { + return X509NameToString(X509_get_issuer_name(GetTopOfChainX509())); +} + +boost::optional<string> Cert::UserId() const { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + X509_NAME* name = X509_get_subject_name(GetTopOfChainX509()); + char buf[1024]; + int len = X509_NAME_get_text_by_NID(name, NID_userId, buf, arraysize(buf)); + if (len < 0) return boost::none; + return string(buf, len); +} + +vector<string> Cert::Hostnames() const { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + vector<string> result; + auto gens = ssl_make_unique(reinterpret_cast<GENERAL_NAMES*>(X509_get_ext_d2i( + GetTopOfChainX509(), NID_subject_alt_name, nullptr, nullptr))); + if (gens) { + for (int i = 0; i < sk_GENERAL_NAME_num(gens.get()); ++i) { + GENERAL_NAME* gen = sk_GENERAL_NAME_value(gens.get(), i); + if (gen->type != GEN_DNS) { + continue; + } + const ASN1_STRING* cstr = gen->d.dNSName; + if (cstr->type != V_ASN1_IA5STRING || cstr->data == nullptr) { + LOG(DFATAL) << "invalid DNS name in the SAN field"; + return {}; + } + result.emplace_back(reinterpret_cast<char*>(cstr->data), cstr->length); + } + } + return result; +} + +boost::optional<string> Cert::KuduKerberosPrincipal() const { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + int idx = X509_get_ext_by_NID(GetTopOfChainX509(), GetKuduKerberosPrincipalOidNid(), -1); + if (idx < 0) return boost::none; + X509_EXTENSION* ext = X509_get_ext(GetTopOfChainX509(), idx); + ASN1_OCTET_STRING* octet_str = X509_EXTENSION_get_data(ext); + const unsigned char* octet_str_data = octet_str->data; + long len; // NOLINT + int tag, xclass; + if (ASN1_get_object(&octet_str_data, &len, &tag, &xclass, octet_str->length) != 0 || + tag != V_ASN1_UTF8STRING) { + LOG(DFATAL) << "invalid extension value in cert " << SubjectName(); + return boost::none; + } + + return string(reinterpret_cast<const char*>(octet_str_data), len); +} + +Status Cert::CheckKeyMatch(const PrivateKey& key) const { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + OPENSSL_RET_NOT_OK(X509_check_private_key(GetTopOfChainX509(), key.GetRawData()), + "certificate does not match private key"); + return Status::OK(); +} + +Status Cert::GetServerEndPointChannelBindings(string* channel_bindings) const { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + // Find the signature type of the certificate. This corresponds to the digest + // (hash) algorithm, and the public key type which signed the cert. + +#if OPENSSL_VERSION_NUMBER >= 0x10002000L + int signature_nid = X509_get_signature_nid(GetTopOfChainX509()); +#else + // Older version of OpenSSL appear not to have a public way to get the + // signature digest method from a certificate. Instead, we reach into the + // 'private' internals. + int signature_nid = OBJ_obj2nid(GetTopOfChainX509()->sig_alg->algorithm); +#endif + + // Retrieve the digest algorithm type. + int digest_nid; + int public_key_nid; + OBJ_find_sigid_algs(signature_nid, &digest_nid, &public_key_nid); + + // RFC 5929: if the certificate's signatureAlgorithm uses no hash functions or + // uses multiple hash functions, then this channel binding type's channel + // bindings are undefined at this time (updates to is channel binding type may + // occur to address this issue if it ever arises). + // + // TODO(dan): can the multiple hash function scenario actually happen? What + // does OBJ_find_sigid_algs do in that scenario? + if (digest_nid == NID_undef) { + return Status::NotSupported("server certificate has no signature digest (hash) algorithm"); + } + + // RFC 5929: if the certificate's signatureAlgorithm uses a single hash + // function, and that hash function is either MD5 [RFC1321] or SHA-1 + // [RFC3174], then use SHA-256 [FIPS-180-3]; + if (digest_nid == NID_md5 || digest_nid == NID_sha1) { + digest_nid = NID_sha256; + } + + const EVP_MD* md = EVP_get_digestbynid(digest_nid); + OPENSSL_RET_IF_NULL(md, "digest for nid not found"); + + // Create a digest BIO. All data written to the BIO will be sent through the + // digest (hash) function. The digest BIO requires a null BIO to writethrough to. + auto null_bio = ssl_make_unique(BIO_new(BIO_s_null())); + auto md_bio = ssl_make_unique(BIO_new(BIO_f_md())); + OPENSSL_RET_NOT_OK(BIO_set_md(md_bio.get(), md), "failed to set digest for BIO"); + BIO_push(md_bio.get(), null_bio.get()); + + // Write the cert to the digest BIO. + RETURN_NOT_OK(ToBIO(md_bio.get(), DataFormat::DER, data_.get())); + + // Read the digest from the BIO and append it to 'channel_bindings'. + char buf[EVP_MAX_MD_SIZE]; + int digest_len = BIO_gets(md_bio.get(), buf, sizeof(buf)); + OPENSSL_RET_NOT_OK(digest_len, "failed to get cert digest from BIO"); + channel_bindings->assign(buf, digest_len); + return Status::OK(); +} + +void Cert::AdoptAndAddRefRawData(RawDataType* data) { + DCHECK_EQ(sk_X509_num(data), 1); + X509* cert = sk_X509_value(data, sk_X509_num(data) - 1); + + DCHECK(cert); +#if OPENSSL_VERSION_NUMBER < 0x10100000L + CHECK_GT(CRYPTO_add(&cert->references, 1, CRYPTO_LOCK_X509), 1) << "X509 use-after-free detected"; +#else + OPENSSL_CHECK_OK(X509_up_ref(cert)) << "X509 use-after-free detected: " << GetOpenSSLErrors(); +#endif + // We copy the STACK_OF() object, but the copy and the original both internally point to the + // same elements. + AdoptRawData(sk_X509_dup(data)); +} + +void Cert::AdoptX509(X509* cert) { + // Free current STACK_OF(X509). + sk_X509_pop_free(data_.get(), X509_free); + // Allocate new STACK_OF(X509) and populate with 'cert'. + STACK_OF(X509)* sk = sk_X509_new_null(); + DCHECK(sk); + sk_X509_push(sk, cert); + AdoptRawData(sk); +} + +void Cert::AdoptAndAddRefX509(X509* cert) { +#if OPENSSL_VERSION_NUMBER < 0x10100000L + CHECK_GT(CRYPTO_add(&cert->references, 1, CRYPTO_LOCK_X509), 1) << "X509 use-after-free detected"; +#else + OPENSSL_CHECK_OK(X509_up_ref(cert)) << "X509 use-after-free detected: " << GetOpenSSLErrors(); +#endif + AdoptX509(cert); +} + +Status Cert::GetPublicKey(PublicKey* key) const { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + EVP_PKEY* raw_key = X509_get_pubkey(GetTopOfChainX509()); + OPENSSL_RET_IF_NULL(raw_key, "unable to get certificate public key"); + key->AdoptRawData(raw_key); + return Status::OK(); +} + +Status CertSignRequest::FromString(const std::string& data, DataFormat format) { + return ::kudu::security::FromString(data, format, &data_); +} + +Status CertSignRequest::ToString(std::string* data, DataFormat format) const { + return ::kudu::security::ToString(data, format, data_.get()); +} + +Status CertSignRequest::FromFile(const std::string& fpath, DataFormat format) { + return ::kudu::security::FromFile(fpath, format, &data_); +} + +CertSignRequest CertSignRequest::Clone() const { + X509_REQ* cloned_req; +#if OPENSSL_VERSION_NUMBER < 0x10100000L + CHECK_GT(CRYPTO_add(&data_->references, 1, CRYPTO_LOCK_X509_REQ), 1) + << "X509_REQ use-after-free detected"; + cloned_req = GetRawData(); +#else + // With OpenSSL 1.1, data structure internals are hidden, and there doesn't + // seem to be a public method that increments data_'s refcount. + cloned_req = X509_REQ_dup(GetRawData()); + CHECK(cloned_req != nullptr) + << "X509 allocation failure detected: " << GetOpenSSLErrors(); +#endif + + CertSignRequest clone; + clone.AdoptRawData(cloned_req); + return clone; +} + +Status CertSignRequest::GetPublicKey(PublicKey* key) const { + SCOPED_OPENSSL_NO_PENDING_ERRORS; + EVP_PKEY* raw_key = X509_REQ_get_pubkey(data_.get()); + OPENSSL_RET_IF_NULL(raw_key, "unable to get CSR public key"); + key->AdoptRawData(raw_key); + return Status::OK(); +} + +} // namespace security +} // namespace kudu