http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/negotiation-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/negotiation-test.cc b/be/src/kudu/rpc/negotiation-test.cc new file mode 100644 index 0000000..68185bb --- /dev/null +++ b/be/src/kudu/rpc/negotiation-test.cc @@ -0,0 +1,1331 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/rpc-test-base.h" + +#include <stdlib.h> +#include <sys/stat.h> + +#include <functional> +#include <memory> +#include <ostream> +#include <string> +#include <thread> + +#include <gflags/gflags.h> +#include <gtest/gtest.h> +#include <sasl/sasl.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" +#include "kudu/rpc/client_negotiation.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/negotiation.h" +#include "kudu/rpc/server_negotiation.h" +#include "kudu/security/crypto.h" +#include "kudu/security/security-test-util.h" +#include "kudu/security/test/mini_kdc.h" +#include "kudu/security/tls_context.h" +#include "kudu/security/tls_socket.h" +#include "kudu/security/token_signer.h" +#include "kudu/security/token_signing_key.h" +#include "kudu/security/token_verifier.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/subprocess.h" +#include "kudu/util/trace.h" +#include "kudu/util/user.h" + +// HACK: MIT Kerberos doesn't have any way of determining its version number, +// but the error messages in krb5-1.10 and earlier are broken due to +// a bug: http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973 +// +// Since we don't have any way to explicitly figure out the version, we just +// look for this random macro which was added in 1.11 (the same version in which +// the above bug was fixed). +#ifndef KRB5_RESPONDER_QUESTION_PASSWORD +#define KRB5_VERSION_LE_1_10 +#endif + +DEFINE_bool(is_test_child, false, + "Used by tests which require clean processes. " + "See TestDisableInit."); +DECLARE_bool(rpc_encrypt_loopback_connections); +DECLARE_bool(rpc_trace_negotiation); + +using std::string; +using std::thread; +using std::unique_ptr; + +using kudu::security::Cert; +using kudu::security::PkiConfig; +using kudu::security::PrivateKey; +using kudu::security::SignedTokenPB; +using kudu::security::TlsContext; +using kudu::security::TokenSigner; +using kudu::security::TokenSigningPrivateKey; +using kudu::security::TokenVerifier; + +namespace kudu { +namespace rpc { + +// The negotiation configuration for a client or server endpoint. +struct EndpointConfig { + // The PKI configuration. + PkiConfig pki; + // The supported SASL mechanisms. + vector<SaslMechanism::Type> sasl_mechs; + // For the client, whether the client has the token. + // For the server, whether the server has the TSK. + bool token; + RpcEncryption encryption; +}; +std::ostream& operator<<(std::ostream& o, EndpointConfig config) { + auto bool_string = [] (bool b) { return b ? "true" : "false"; }; + o << "{pki: " << config.pki + << ", sasl-mechs: [" << JoinMapped(config.sasl_mechs, SaslMechanism::name_of, ", ") + << "], token: " << bool_string(config.token) + << ", encryption: "; + + switch (config.encryption) { + case RpcEncryption::DISABLED: o << "DISABLED"; break; + case RpcEncryption::OPTIONAL: o << "OPTIONAL"; break; + case RpcEncryption::REQUIRED: o << "REQUIRED"; break; + } + + o << "}"; + return o; +} + +// A description of a negotiation sequence, including client and server +// configuration, as well as expected results. +struct NegotiationDescriptor { + EndpointConfig client; + EndpointConfig server; + + bool use_test_socket; + + bool rpc_encrypt_loopback; + + // The expected client status from negotiating. + Status client_status; + // The expected server status from negotiating. + Status server_status; + + // The expected negotiated authentication type. + AuthenticationType negotiated_authn; + + // The expected SASL mechanism, if SASL authentication is negotiated. + SaslMechanism::Type negotiated_mech; + + // Whether the negotiation is expected to perform a TLS handshake. + bool tls_negotiated; +}; +std::ostream& operator<<(std::ostream& o, NegotiationDescriptor c) { + auto bool_string = [] (bool b) { return b ? "true" : "false"; }; + o << "{client: " << c.client + << ", server: " << c.server + << "}, rpc-encrypt-loopback: " << bool_string(c.rpc_encrypt_loopback); + return o; +} + +class NegotiationTestSocket : public Socket { + public: + // Return an arbitrary public IP + Status GetPeerAddress(Sockaddr *cur_addr) const override { + return cur_addr->ParseString("8.8.8.8:12345", 0); + } +}; + +class TestNegotiation : public RpcTestBase, + public ::testing::WithParamInterface<NegotiationDescriptor> { + public: + void SetUp() override { + RpcTestBase::SetUp(); + ASSERT_OK(SaslInit()); + } +}; + +TEST_P(TestNegotiation, TestNegotiation) { + NegotiationDescriptor desc = GetParam(); + + // Generate a trusted root certificate. + PrivateKey ca_key; + Cert ca_cert; + ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert)); + + // Create and configure a TLS context for each endpoint. + TlsContext client_tls_context; + TlsContext server_tls_context; + ASSERT_OK(client_tls_context.Init()); + ASSERT_OK(server_tls_context.Init()); + ASSERT_OK(ConfigureTlsContext(desc.client.pki, ca_cert, ca_key, &client_tls_context)); + ASSERT_OK(ConfigureTlsContext(desc.server.pki, ca_cert, ca_key, &server_tls_context)); + + FLAGS_rpc_encrypt_loopback_connections = desc.rpc_encrypt_loopback; + + // Generate an optional client token and server token verifier. + TokenSigner token_signer(60, 20, std::make_shared<TokenVerifier>()); + { + unique_ptr<TokenSigningPrivateKey> key; + ASSERT_OK(token_signer.CheckNeedKey(&key)); + // No keys are available yet, so should be able to add. + ASSERT_NE(nullptr, key.get()); + ASSERT_OK(token_signer.AddKey(std::move(key))); + } + TokenVerifier token_verifier; + boost::optional<SignedTokenPB> authn_token; + if (desc.client.token) { + authn_token = SignedTokenPB(); + security::TokenPB token; + token.set_expire_unix_epoch_seconds(WallTime_Now() + 60); + token.mutable_authn()->set_username("client-token"); + ASSERT_TRUE(token.SerializeToString(authn_token->mutable_token_data())); + ASSERT_OK(token_signer.SignToken(&*authn_token)); + } + if (desc.server.token) { + ASSERT_OK(token_verifier.ImportKeys(token_signer.verifier().ExportKeys())); + } + + // Create the listening socket, client socket, and server socket. + Socket listening_socket; + ASSERT_OK(listening_socket.Init(0)); + ASSERT_OK(listening_socket.BindAndListen(Sockaddr(), 1)); + Sockaddr server_addr; + ASSERT_OK(listening_socket.GetSocketAddress(&server_addr)); + + unique_ptr<Socket> client_socket(new Socket()); + ASSERT_OK(client_socket->Init(0)); + client_socket->Connect(server_addr); + + unique_ptr<Socket> server_socket(desc.use_test_socket ? + new NegotiationTestSocket() : + new Socket()); + + Sockaddr client_addr; + CHECK_OK(listening_socket.Accept(server_socket.get(), &client_addr, 0)); + + // Create and configure the client and server negotiation instances. + ClientNegotiation client_negotiation(std::move(client_socket), + &client_tls_context, + authn_token, + desc.client.encryption); + ServerNegotiation server_negotiation(std::move(server_socket), + &server_tls_context, + &token_verifier, + desc.server.encryption); + + // Set client and server SASL mechanisms. + MiniKdc kdc; + bool kdc_started = false; + auto start_kdc_once = [&] () { + if (!kdc_started) { + kdc_started = true; + RETURN_NOT_OK(kdc.Start()); + } + return Status::OK(); + }; + for (auto mech : desc.client.sasl_mechs) { + switch (mech) { + case SaslMechanism::INVALID: break; + case SaslMechanism::PLAIN: + ASSERT_OK(client_negotiation.EnablePlain("client-plain", "client-password")); + break; + case SaslMechanism::GSSAPI: + ASSERT_OK(start_kdc_once()); + ASSERT_OK(kdc.CreateUserPrincipal("client-gssapi")); + ASSERT_OK(kdc.Kinit("client-gssapi")); + ASSERT_OK(kdc.SetKrb5Environment()); + client_negotiation.set_server_fqdn("127.0.0.1"); + ASSERT_OK(client_negotiation.EnableGSSAPI()); + break; + } + } + for (auto mech : desc.server.sasl_mechs) { + switch (mech) { + case SaslMechanism::INVALID: break; + case SaslMechanism::PLAIN: + ASSERT_OK(server_negotiation.EnablePlain()); + break; + case SaslMechanism::GSSAPI: + ASSERT_OK(start_kdc_once()); + // Create the server principal and keytab. + string kt_path; + ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path)); + CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/)); + server_negotiation.set_server_fqdn("127.0.0.1"); + ASSERT_OK(server_negotiation.EnableGSSAPI()); + break; + } + } + + // Run the client/server negotiation. Because negotiation is blocking, it + // has to be done on separate threads. + Status client_status; + Status server_status; + thread client_thread([&] () { + scoped_refptr<Trace> t(new Trace()); + ADOPT_TRACE(t.get()); + client_status = client_negotiation.Negotiate(); + // Close the socket so that the server will not block forever on error. + client_negotiation.socket()->Close(); + + if (FLAGS_rpc_trace_negotiation || !client_status.ok()) { + string msg = Trace::CurrentTrace()->DumpToString(); + if (!client_status.ok()) { + LOG(WARNING) << "Failed client RPC negotiation. Client trace:\n" << msg; + } else { + LOG(INFO) << "RPC negotiation tracing enabled. Client trace:\n" << msg; + } + } + }); + thread server_thread([&] () { + scoped_refptr<Trace> t(new Trace()); + ADOPT_TRACE(t.get()); + server_status = server_negotiation.Negotiate(); + // Close the socket so that the client will not block forever on error. + server_negotiation.socket()->Close(); + + if (FLAGS_rpc_trace_negotiation || !server_status.ok()) { + string msg = Trace::CurrentTrace()->DumpToString(); + if (!server_status.ok()) { + LOG(WARNING) << "Failed server RPC negotiation. Server trace:\n" << msg; + } else { + LOG(INFO) << "RPC negotiation tracing enabled. Server trace:\n" << msg; + } + } + }); + client_thread.join(); + server_thread.join(); + + // Check the negotiation outcome against the expected outcome. + EXPECT_EQ(desc.client_status.CodeAsString(), client_status.CodeAsString()); + EXPECT_EQ(desc.server_status.CodeAsString(), server_status.CodeAsString()); + ASSERT_STR_MATCHES(client_status.ToString(), desc.client_status.ToString()); + ASSERT_STR_MATCHES(server_status.ToString(), desc.server_status.ToString()); + + if (client_status.ok()) { + EXPECT_TRUE(server_status.ok()); + + // Make sure the negotiations agree with the expected values. + EXPECT_EQ(desc.negotiated_authn, client_negotiation.negotiated_authn()); + EXPECT_EQ(desc.negotiated_mech, client_negotiation.negotiated_mechanism()); + EXPECT_EQ(desc.negotiated_authn, server_negotiation.negotiated_authn()); + EXPECT_EQ(desc.negotiated_mech, server_negotiation.negotiated_mechanism()); + EXPECT_EQ(desc.tls_negotiated, server_negotiation.tls_negotiated()); + EXPECT_EQ(desc.tls_negotiated, server_negotiation.tls_negotiated()); + + bool client_tls_socket = dynamic_cast<security::TlsSocket*>(client_negotiation.socket()); + bool server_tls_socket = dynamic_cast<security::TlsSocket*>(server_negotiation.socket()); + EXPECT_EQ(desc.rpc_encrypt_loopback, client_tls_socket); + EXPECT_EQ(desc.rpc_encrypt_loopback, server_tls_socket); + + // Check that the expected user subject is authenticated. + RemoteUser remote_user = server_negotiation.take_authenticated_user(); + switch (server_negotiation.negotiated_authn()) { + case AuthenticationType::SASL: + switch (server_negotiation.negotiated_mechanism()) { + case SaslMechanism::PLAIN: + EXPECT_EQ("client-plain", remote_user.username()); + break; + case SaslMechanism::GSSAPI: + EXPECT_EQ("client-gssapi", remote_user.username()); + EXPECT_EQ("[email protected]", remote_user.principal().value_or("")); + break; + case SaslMechanism::INVALID: LOG(FATAL) << "invalid mechanism negotiated"; + } + break; + case AuthenticationType::CERTIFICATE: { + // We expect the cert to be using the local username, because it hasn't + // logged in from any Keytab. + string expected; + CHECK_OK(GetLoggedInUser(&expected)); + EXPECT_EQ(expected, remote_user.username()); + EXPECT_FALSE(remote_user.principal()); + break; + } + case AuthenticationType::TOKEN: + EXPECT_EQ("client-token", remote_user.username()); + break; + case AuthenticationType::INVALID: LOG(FATAL) << "invalid authentication negotiated"; + } + } +} + +INSTANTIATE_TEST_CASE_P(NegotiationCombinations, + TestNegotiation, + ::testing::Values( + + // client: no authn/mechs + // server: no authn/mechs + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + {}, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::NONE, + {}, + false, + RpcEncryption::OPTIONAL, + }, + false, + false, + Status::NotAuthorized(".*client is not configured with an authentication type"), + Status::NetworkError(""), + AuthenticationType::INVALID, + SaslMechanism::INVALID, + false, + }, + + // client: PLAIN + // server: no authn/mechs + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::NONE, + {}, + false, + RpcEncryption::OPTIONAL, + }, + false, + false, + Status::NotAuthorized(".* server mechanism list is empty"), + Status::NotAuthorized(".* server mechanism list is empty"), + AuthenticationType::INVALID, + SaslMechanism::INVALID, + false, + }, + + // client: PLAIN + // server: PLAIN + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::OPTIONAL + }, + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::DISABLED, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::PLAIN, + false, + }, + + // client: GSSAPI + // server: GSSAPI + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::DISABLED, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + false, + }, + + // client: GSSAPI, PLAIN + // server: GSSAPI, PLAIN + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::GSSAPI, SaslMechanism::PLAIN }, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::GSSAPI, SaslMechanism::PLAIN }, + false, + RpcEncryption::DISABLED, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + false, + }, + + // client: GSSAPI, PLAIN + // server: GSSAPI + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::GSSAPI, SaslMechanism::PLAIN }, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::DISABLED, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + false, + }, + + // client: PLAIN + // server: GSSAPI + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::DISABLED, + }, + false, + false, + Status::NotAuthorized(".*client does not have Kerberos enabled"), + Status::NetworkError(""), + AuthenticationType::INVALID, + SaslMechanism::INVALID, + false, + }, + + // client: GSSAPI, + // server: GSSAPI, self-signed cert + // loopback encryption + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::SELF_SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::OPTIONAL, + }, + false, + true, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + true, + }, + + // client: GSSAPI, signed-cert + // server: GSSAPI, self-signed cert + // This tests that the server will not advertise CERTIFICATE authentication, + // since it doesn't have a trusted cert. + NegotiationDescriptor { + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::SELF_SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::OPTIONAL, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + true, + }, + + // client: PLAIN, + // server: PLAIN, self-signed cert + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::SELF_SIGNED, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::OPTIONAL, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::PLAIN, + true, + }, + + // client: signed-cert + // server: signed-cert + NegotiationDescriptor { + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::OPTIONAL, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::CERTIFICATE, + SaslMechanism::INVALID, + true, + }, + + // client: token, trusted cert + // server: token, signed-cert, GSSAPI + NegotiationDescriptor { + EndpointConfig { + PkiConfig::TRUSTED, + { }, + true, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::PLAIN }, + true, + RpcEncryption::OPTIONAL, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::TOKEN, + SaslMechanism::INVALID, + true, + }, + + // client: PLAIN, token + // server: PLAIN, token, signed cert + // Test that the client won't negotiate token authn if it doesn't have a + // trusted cert. We aren't expecting this to happen in practice (the + // token and trusted CA cert should come as a package). + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::PLAIN }, + true, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::PLAIN }, + true, + RpcEncryption::OPTIONAL, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::PLAIN, + true, + }, + + // client: PLAIN, GSSAPI, signed-cert, token + // server: PLAIN, GSSAPI, signed-cert, token + NegotiationDescriptor { + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::PLAIN, SaslMechanism::GSSAPI }, + true, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::PLAIN, SaslMechanism::GSSAPI }, + true, + RpcEncryption::OPTIONAL, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::CERTIFICATE, + SaslMechanism::INVALID, + true, + }, + + // client: PLAIN, TLS disabled + // server: PLAIN, TLS required + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::DISABLED, + }, + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::REQUIRED, + }, + false, + false, + Status::NotAuthorized(".*client does not support required TLS encryption"), + Status::NotAuthorized(".*client does not support required TLS encryption"), + AuthenticationType::SASL, + SaslMechanism::PLAIN, + true, + }, + + // client: PLAIN, TLS required + // server: PLAIN, TLS disabled + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::REQUIRED, + }, + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::DISABLED, + }, + false, + false, + Status::NotAuthorized(".*server does not support required TLS encryption"), + Status::NetworkError(""), + AuthenticationType::SASL, + SaslMechanism::PLAIN, + true, + }, + + // client: GSSAPI, TLS required, externally-signed cert + // server: GSSAPI, TLS required, externally-signed cert + NegotiationDescriptor { + EndpointConfig { + PkiConfig::EXTERNALLY_SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::REQUIRED, + }, + EndpointConfig { + PkiConfig::EXTERNALLY_SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::REQUIRED, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + true, + }, + + // client: GSSAPI, TLS optional, externally-signed cert + // server: GSSAPI, TLS required, signed cert + NegotiationDescriptor { + EndpointConfig { + PkiConfig::EXTERNALLY_SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::OPTIONAL, + }, + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::REQUIRED, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + true, + }, + + // client: GSSAPI, TLS required + // server: GSSAPI, TLS required, externally-signed cert + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::REQUIRED, + }, + EndpointConfig { + PkiConfig::EXTERNALLY_SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::REQUIRED, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + true, + }, + + // client: GSSAPI, PLAIN, TLS required, externally-signed cert + // server: PLAIN, TLS required, externally-signed cert + NegotiationDescriptor { + EndpointConfig { + PkiConfig::EXTERNALLY_SIGNED, + { SaslMechanism::GSSAPI, SaslMechanism::PLAIN }, + false, + RpcEncryption::REQUIRED, + }, + EndpointConfig { + PkiConfig::EXTERNALLY_SIGNED, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::REQUIRED, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::PLAIN, + true, + }, + + // client: GSSAPI, TLS disabled, signed cert + // server: GSSAPI, TLS required, externally-signed cert + NegotiationDescriptor { + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::DISABLED, + }, + EndpointConfig { + PkiConfig::EXTERNALLY_SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::REQUIRED, + }, + false, + false, + Status::NotAuthorized(".*client does not support required TLS encryption"), + Status::NotAuthorized(".*client does not support required TLS encryption"), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + true, + }, + + // client: GSSAPI, TLS required, signed cert + // server: GSSAPI, TLS required, externally-signed cert + NegotiationDescriptor { + EndpointConfig { + PkiConfig::SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::REQUIRED, + }, + EndpointConfig { + PkiConfig::EXTERNALLY_SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::REQUIRED, + }, + false, + false, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + true, + }, + + // client: PLAIN + // server: PLAIN + // connection from public routable IP + NegotiationDescriptor { + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::OPTIONAL + }, + EndpointConfig { + PkiConfig::NONE, + { SaslMechanism::PLAIN }, + false, + RpcEncryption::OPTIONAL + }, + true, + false, + Status::NotAuthorized(".*unencrypted connections from publicly routable IPs"), + Status::NotAuthorized(".*unencrypted connections from publicly routable IPs"), + AuthenticationType::SASL, + SaslMechanism::PLAIN, + false, + }, + + // client: GSSAPI, TLS required, externally-signed cert + // server: GSSAPI, TLS required, externally-signed cert + // connection from public routable IP + NegotiationDescriptor { + EndpointConfig { + PkiConfig::EXTERNALLY_SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::REQUIRED, + }, + EndpointConfig { + PkiConfig::EXTERNALLY_SIGNED, + { SaslMechanism::GSSAPI }, + false, + RpcEncryption::REQUIRED, + }, + true, + // true as no longer a loopback connection. + true, + Status::OK(), + Status::OK(), + AuthenticationType::SASL, + SaslMechanism::GSSAPI, + true, + } +)); + +// A "Callable" that takes a socket for use with starting a thread. +// Can be used for ServerNegotiation or ClientNegotiation threads. +typedef std::function<void(unique_ptr<Socket>)> SocketCallable; + +// Call Accept() on the socket, then pass the connection to the server runner +static void RunAcceptingDelegator(Socket* acceptor, + const SocketCallable& server_runner) { + unique_ptr<Socket> conn(new Socket()); + Sockaddr remote; + CHECK_OK(acceptor->Accept(conn.get(), &remote, 0)); + server_runner(std::move(conn)); +} + +// Set up a socket and run a negotiation sequence. +static void RunNegotiationTest(const SocketCallable& server_runner, + const SocketCallable& client_runner) { + Socket server_sock; + CHECK_OK(server_sock.Init(0)); + ASSERT_OK(server_sock.BindAndListen(Sockaddr(), 1)); + Sockaddr server_bind_addr; + ASSERT_OK(server_sock.GetSocketAddress(&server_bind_addr)); + thread server(RunAcceptingDelegator, &server_sock, server_runner); + + unique_ptr<Socket> client_sock(new Socket()); + CHECK_OK(client_sock->Init(0)); + ASSERT_OK(client_sock->Connect(server_bind_addr)); + thread client(client_runner, std::move(client_sock)); + + LOG(INFO) << "Waiting for test threads to terminate..."; + client.join(); + LOG(INFO) << "Client thread terminated."; + + server.join(); + LOG(INFO) << "Server thread terminated."; +} + +//////////////////////////////////////////////////////////////////////////////// + +#ifndef __APPLE__ +template<class T> +using CheckerFunction = std::function<void(const Status&, T&)>; + +// Run GSSAPI negotiation from the server side. Runs +// 'post_check' after negotiation to verify the result. +static void RunGSSAPINegotiationServer(unique_ptr<Socket> socket, + const CheckerFunction<ServerNegotiation>& post_check) { + TlsContext tls_context; + CHECK_OK(tls_context.Init()); + TokenVerifier token_verifier; + ServerNegotiation server_negotiation(std::move(socket), &tls_context, + &token_verifier, RpcEncryption::OPTIONAL); + server_negotiation.set_server_fqdn("127.0.0.1"); + CHECK_OK(server_negotiation.EnableGSSAPI()); + post_check(server_negotiation.Negotiate(), server_negotiation); +} + +// Run GSSAPI negotiation from the client side. Runs +// 'post_check' after negotiation to verify the result. +static void RunGSSAPINegotiationClient(unique_ptr<Socket> conn, + const CheckerFunction<ClientNegotiation>& post_check) { + TlsContext tls_context; + CHECK_OK(tls_context.Init()); + ClientNegotiation client_negotiation(std::move(conn), &tls_context, + boost::none, RpcEncryption::OPTIONAL); + client_negotiation.set_server_fqdn("127.0.0.1"); + CHECK_OK(client_negotiation.EnableGSSAPI()); + post_check(client_negotiation.Negotiate(), client_negotiation); +} + +// Test invalid SASL negotiations using the GSSAPI (kerberos) mechanism over a socket. +// This test is ignored on macOS because the system Kerberos implementation +// (Heimdal) caches the non-existence of client credentials, which causes futher +// tests to fail. +TEST_F(TestNegotiation, TestGSSAPIInvalidNegotiation) { + MiniKdc kdc; + ASSERT_OK(kdc.Start()); + + // Try to negotiate with no krb5 credentials on either side. It should fail on both + // sides. + RunNegotiationTest( + std::bind(RunGSSAPINegotiationServer, std::placeholders::_1, + [](const Status& s, ServerNegotiation& server) { + // The client notices there are no credentials and + // doesn't send any failure message to the server. + // Instead, it just disconnects. + // + // TODO(todd): it might be preferable to have the server + // fail to start if it has no valid keytab. + CHECK(s.IsNetworkError()); + }), + std::bind(RunGSSAPINegotiationClient, std::placeholders::_1, + [](const Status& s, ClientNegotiation& client) { + CHECK(s.IsNotAuthorized()); +#ifndef KRB5_VERSION_LE_1_10 + CHECK_GT(s.ToString().find("No Kerberos credentials available"), 0); +#endif + })); + + + // Create the server principal and keytab. + string kt_path; + ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path)); + CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/)); + + // Try to negotiate with no krb5 credentials on the client. It should fail on both + // sides. + RunNegotiationTest( + std::bind(RunGSSAPINegotiationServer, std::placeholders::_1, + [](const Status& s, ServerNegotiation& server) { + // The client notices there are no credentials and + // doesn't send any failure message to the server. + // Instead, it just disconnects. + CHECK(s.IsNetworkError()); + }), + std::bind(RunGSSAPINegotiationClient, std::placeholders::_1, + [](const Status& s, ClientNegotiation& client) { + CHECK(s.IsNotAuthorized()); +#ifndef KRB5_VERSION_LE_1_10 + ASSERT_STR_MATCHES(s.ToString(), + "Not authorized: No Kerberos credentials available.*"); +#endif + })); + + // Create and kinit as a client user. + ASSERT_OK(kdc.CreateUserPrincipal("testuser")); + ASSERT_OK(kdc.Kinit("testuser")); + ASSERT_OK(kdc.SetKrb5Environment()); + + // Change the server's keytab file so that it has inappropriate + // credentials. + // Authentication should now fail. + ASSERT_OK(kdc.CreateServiceKeytab("otherservice/127.0.0.1", &kt_path)); + CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/)); + + RunNegotiationTest( + std::bind(RunGSSAPINegotiationServer, std::placeholders::_1, + [](const Status& s, ServerNegotiation& server) { + CHECK(s.IsNotAuthorized()); +#ifndef KRB5_VERSION_LE_1_10 + ASSERT_STR_CONTAINS(s.ToString(), + "No key table entry found matching kudu/127.0.0.1"); +#endif + }), + std::bind(RunGSSAPINegotiationClient, std::placeholders::_1, + [](const Status& s, ClientNegotiation& client) { + CHECK(s.IsNotAuthorized()); +#ifndef KRB5_VERSION_LE_1_10 + ASSERT_STR_CONTAINS(s.ToString(), + "No key table entry found matching kudu/127.0.0.1"); +#endif + })); +} +#endif + +#ifndef __APPLE__ +// Test that the pre-flight check for servers requiring Kerberos provides +// nice error messages for missing or bad keytabs. +// +// This is ignored on macOS because the system Kerberos implementation does not +// fail the preflight check when the keytab is inaccessible, probably because +// the preflight check passes a 0-length token. +TEST_F(TestNegotiation, TestPreflight) { + // Try pre-flight with no keytab. + Status s = ServerNegotiation::PreflightCheckGSSAPI(); + ASSERT_FALSE(s.ok()); +#ifndef KRB5_VERSION_LE_1_10 + ASSERT_STR_MATCHES(s.ToString(), "Key table file.*not found"); +#endif + // Try with a valid krb5 environment and keytab. + MiniKdc kdc; + ASSERT_OK(kdc.Start()); + ASSERT_OK(kdc.SetKrb5Environment()); + string kt_path; + ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path)); + CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/)); + + ASSERT_OK(ServerNegotiation::PreflightCheckGSSAPI()); + + // Try with an inaccessible keytab. + CHECK_ERR(chmod(kt_path.c_str(), 0000)); + s = ServerNegotiation::PreflightCheckGSSAPI(); + ASSERT_FALSE(s.ok()); +#ifndef KRB5_VERSION_LE_1_10 + ASSERT_STR_MATCHES(s.ToString(), "error accessing keytab: Permission denied"); +#endif + CHECK_ERR(unlink(kt_path.c_str())); + + // Try with a keytab that has the wrong credentials. + ASSERT_OK(kdc.CreateServiceKeytab("wrong-service/127.0.0.1", &kt_path)); + CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/)); + s = ServerNegotiation::PreflightCheckGSSAPI(); + ASSERT_FALSE(s.ok()); +#ifndef KRB5_VERSION_LE_1_10 + ASSERT_STR_MATCHES(s.ToString(), "No key table entry found matching kudu/.*"); +#endif +} +#endif + +//////////////////////////////////////////////////////////////////////////////// + +static void RunTimeoutExpectingServer(unique_ptr<Socket> socket) { + TlsContext tls_context; + CHECK_OK(tls_context.Init()); + TokenVerifier token_verifier; + ServerNegotiation server_negotiation(std::move(socket), &tls_context, + &token_verifier, RpcEncryption::OPTIONAL); + CHECK_OK(server_negotiation.EnablePlain()); + Status s = server_negotiation.Negotiate(); + ASSERT_TRUE(s.IsNetworkError()) << "Expected client to time out and close the connection. Got: " + << s.ToString(); +} + +static void RunTimeoutNegotiationClient(unique_ptr<Socket> sock) { + TlsContext tls_context; + CHECK_OK(tls_context.Init()); + ClientNegotiation client_negotiation(std::move(sock), &tls_context, + boost::none, RpcEncryption::OPTIONAL); + CHECK_OK(client_negotiation.EnablePlain("test", "test")); + MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L); + client_negotiation.set_deadline(deadline); + Status s = client_negotiation.Negotiate(); + ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString(); + CHECK_OK(client_negotiation.socket()->Shutdown(true, true)); +} + +// Ensure that the client times out. +TEST_F(TestNegotiation, TestClientTimeout) { + RunNegotiationTest(RunTimeoutExpectingServer, RunTimeoutNegotiationClient); +} + +//////////////////////////////////////////////////////////////////////////////// + +static void RunTimeoutNegotiationServer(unique_ptr<Socket> socket) { + TlsContext tls_context; + CHECK_OK(tls_context.Init()); + TokenVerifier token_verifier; + ServerNegotiation server_negotiation(std::move(socket), &tls_context, + &token_verifier, RpcEncryption::OPTIONAL); + CHECK_OK(server_negotiation.EnablePlain()); + MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L); + server_negotiation.set_deadline(deadline); + Status s = server_negotiation.Negotiate(); + ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString(); + CHECK_OK(server_negotiation.socket()->Close()); +} + +static void RunTimeoutExpectingClient(unique_ptr<Socket> socket) { + TlsContext tls_context; + CHECK_OK(tls_context.Init()); + ClientNegotiation client_negotiation(std::move(socket), &tls_context, + boost::none, RpcEncryption::OPTIONAL); + CHECK_OK(client_negotiation.EnablePlain("test", "test")); + Status s = client_negotiation.Negotiate(); + ASSERT_TRUE(s.IsNetworkError()) << "Expected server to time out and close the connection. Got: " + << s.ToString(); +} + +// Ensure that the server times out. +TEST_F(TestNegotiation, TestServerTimeout) { + RunNegotiationTest(RunTimeoutNegotiationServer, RunTimeoutExpectingClient); +} + +//////////////////////////////////////////////////////////////////////////////// + +// This suite of tests ensure that applications that embed the Kudu client are +// able to externally handle the initialization of SASL. See KUDU-1749 and +// IMPALA-4497 for context. +// +// The tests are a bit tricky because the initialization of SASL is static state +// that we can't easily clear/reset between test cases. So, each test invokes +// itself as a subprocess with the appropriate --gtest_filter line as well as a +// special flag to indicate that it is the test child running. +class TestDisableInit : public KuduTest { + protected: + // Run the lambda 'f' in a newly-started process, capturing its stderr + // into 'stderr'. + template<class TestFunc> + void DoTest(const TestFunc& f, string* stderr = nullptr) { + if (FLAGS_is_test_child) { + f(); + return; + } + + // Invoke the currently-running test case in a new subprocess. + string filter_flag = strings::Substitute("--gtest_filter=$0.$1", + CURRENT_TEST_CASE_NAME(), CURRENT_TEST_NAME()); + string executable_path; + CHECK_OK(env_->GetExecutablePath(&executable_path)); + string stdout; + Status s = Subprocess::Call({ executable_path, "test", filter_flag, "--is_test_child" }, + "" /* stdin */, + &stdout, + stderr); + ASSERT_TRUE(s.ok()) << "Test failed: " << stdout; + } +}; + +// Test disabling SASL but not actually properly initializing it before usage. +TEST_F(TestDisableInit, TestDisableSasl_NotInitialized) { + DoTest([]() { + CHECK_OK(DisableSaslInitialization()); + Status s = SaslInit(); + ASSERT_STR_CONTAINS(s.ToString(), "was disabled, but SASL was not externally initialized"); + }); +} + +// Test disabling SASL with proper initialization by some other app. +TEST_F(TestDisableInit, TestDisableSasl_Good) { + DoTest([]() { + rpc::internal::SaslSetMutex(); + sasl_client_init(NULL); + CHECK_OK(DisableSaslInitialization()); + ASSERT_OK(SaslInit()); + }); +} + +// Test a client which inits SASL itself but doesn't remember to disable Kudu's +// SASL initialization. +TEST_F(TestDisableInit, TestMultipleSaslInit) { + string stderr; + DoTest([]() { + rpc::internal::SaslSetMutex(); + sasl_client_init(NULL); + ASSERT_OK(SaslInit()); + }, &stderr); + // If we are the parent, we should see the warning from the child that it automatically + // skipped initialization because it detected that it was already initialized. + if (!FLAGS_is_test_child) { + ASSERT_STR_CONTAINS(stderr, "Skipping initialization"); + } +} + +// We are not able to detect mutexes not being set with the macOS version of libsasl. +#ifndef __APPLE__ +// Test disabling SASL but not remembering to initialize the SASL mutex support. This +// should succeed but generate a warning. +TEST_F(TestDisableInit, TestDisableSasl_NoMutexImpl) { + string stderr; + DoTest([]() { + sasl_client_init(NULL); + CHECK_OK(DisableSaslInitialization()); + ASSERT_OK(SaslInit()); + }, &stderr); + // If we are the parent, we should see the warning from the child. + if (!FLAGS_is_test_child) { + ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation"); + } +} + +// Test a client which inits SASL itself but doesn't remember to disable Kudu's +// SASL initialization. +TEST_F(TestDisableInit, TestMultipleSaslInit_NoMutexImpl) { + string stderr; + DoTest([]() { + sasl_client_init(NULL); + ASSERT_OK(SaslInit()); + }, &stderr); + // If we are the parent, we should see the warning from the child that it automatically + // skipped initialization because it detected that it was already initialized. + if (!FLAGS_is_test_child) { + ASSERT_STR_CONTAINS(stderr, "Skipping initialization"); + ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation"); + } +} +#endif + +} // namespace rpc +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/negotiation.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/negotiation.cc b/be/src/kudu/rpc/negotiation.cc new file mode 100644 index 0000000..66a0112 --- /dev/null +++ b/be/src/kudu/rpc/negotiation.cc @@ -0,0 +1,317 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/negotiation.h" + +#include <poll.h> +#include <sys/time.h> + +#include <memory> +#include <ostream> +#include <string> + +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/blocking_ops.h" +#include "kudu/rpc/client_negotiation.h" +#include "kudu/rpc/connection.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/reactor.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/sasl_common.h" +#include "kudu/rpc/server_negotiation.h" +#include "kudu/security/tls_context.h" +#include "kudu/util/errno.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/logging.h" +#include "kudu/util/status.h" +#include "kudu/util/trace.h" + +DEFINE_bool(rpc_trace_negotiation, false, + "If enabled, dump traces of all RPC negotiations to the log"); +TAG_FLAG(rpc_trace_negotiation, runtime); +TAG_FLAG(rpc_trace_negotiation, advanced); +TAG_FLAG(rpc_trace_negotiation, experimental); + +DEFINE_int32(rpc_negotiation_inject_delay_ms, 0, + "If enabled, injects the given number of milliseconds delay into " + "the RPC negotiation process on the server side."); +TAG_FLAG(rpc_negotiation_inject_delay_ms, unsafe); + +DECLARE_string(keytab_file); +DECLARE_string(rpc_certificate_file); + +DEFINE_bool(rpc_encrypt_loopback_connections, false, + "Whether to encrypt data transfer on RPC connections that stay within " + "a single host. Encryption here is likely to offer no additional " + "security benefit since only a local 'root' user could intercept the " + "traffic, and wire encryption does not suitably protect against such " + "an attacker."); +TAG_FLAG(rpc_encrypt_loopback_connections, advanced); + +using std::unique_ptr; +using strings::Substitute; + +namespace kudu { +namespace rpc { + +const char* AuthenticationTypeToString(AuthenticationType t) { + switch (t) { + case AuthenticationType::INVALID: return "INVALID"; break; + case AuthenticationType::SASL: return "SASL"; break; + case AuthenticationType::TOKEN: return "TOKEN"; break; + case AuthenticationType::CERTIFICATE: return "CERTIFICATE"; break; + } + return "<cannot reach here>"; +} + +std::ostream& operator<<(std::ostream& o, AuthenticationType authentication_type) { + return o << AuthenticationTypeToString(authentication_type); +} + +// Wait for the client connection to be established and become ready for writing. +static Status WaitForClientConnect(Socket* socket, const MonoTime& deadline) { + TRACE("Waiting for socket to connect"); + int fd = socket->GetFd(); + struct pollfd poll_fd; + poll_fd.fd = fd; + poll_fd.events = POLLOUT; + poll_fd.revents = 0; + + MonoTime now; + MonoDelta remaining; + while (true) { + now = MonoTime::Now(); + remaining = deadline - now; + DVLOG(4) << "Client waiting to connect for negotiation, time remaining until timeout deadline: " + << remaining.ToString(); + if (PREDICT_FALSE(remaining.ToNanoseconds() <= 0)) { + return Status::TimedOut("Timeout exceeded waiting to connect"); + } +#if defined(__linux__) + struct timespec ts; + remaining.ToTimeSpec(&ts); + int ready = ppoll(&poll_fd, 1, &ts, NULL); +#else + int ready = poll(&poll_fd, 1, remaining.ToMilliseconds()); +#endif + if (ready == -1) { + int err = errno; + if (err == EINTR) { + // We were interrupted by a signal, let's go again. + continue; + } else { + return Status::NetworkError("Error from ppoll() while waiting to connect", + ErrnoToString(err), err); + } + } else if (ready == 0) { + // Timeout exceeded. Loop back to the top to our impending doom. + continue; + } else { + // Success. + break; + } + } + + // Connect finished, but this doesn't mean that we connected successfully. + // Check the socket for an error. + int so_error = 0; + socklen_t socklen = sizeof(so_error); + int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &socklen); + if (rc != 0) { + return Status::NetworkError("Unable to check connected socket for errors", + ErrnoToString(errno), + errno); + } + if (so_error != 0) { + return Status::NetworkError("connect", ErrnoToString(so_error), so_error); + } + + return Status::OK(); +} + +// Disable / reset socket timeouts. +static Status DisableSocketTimeouts(Socket* socket) { + RETURN_NOT_OK(socket->SetSendTimeout(MonoDelta::FromNanoseconds(0L))); + RETURN_NOT_OK(socket->SetRecvTimeout(MonoDelta::FromNanoseconds(0L))); + return Status::OK(); +} + +// Perform client negotiation. We don't LOG() anything, we leave that to our caller. +static Status DoClientNegotiation(Connection* conn, + RpcAuthentication authentication, + RpcEncryption encryption, + MonoTime deadline, + unique_ptr<ErrorStatusPB>* rpc_error) { + const auto* messenger = conn->reactor_thread()->reactor()->messenger(); + // Prefer secondary credentials (such as authn token) if permitted by policy. + const auto authn_token = (conn->credentials_policy() == CredentialsPolicy::PRIMARY_CREDENTIALS) + ? boost::none : messenger->authn_token(); + ClientNegotiation client_negotiation(conn->release_socket(), + &messenger->tls_context(), + authn_token, + encryption); + + // Note that the fqdn is an IP address here: we've already lost whatever DNS + // name the client was attempting to use. Unless krb5 is configured with 'rdns + // = false', it will automatically take care of reversing this address to its + // canonical hostname to determine the expected server principal. + client_negotiation.set_server_fqdn(conn->remote().host()); + + if (authentication != RpcAuthentication::DISABLED) { + Status s = client_negotiation.EnableGSSAPI(); + if (!s.ok()) { + // If we can't enable GSSAPI, it's likely the client is just missing the + // appropriate SASL plugin. We don't want to require it to be installed + // if the user doesn't care about connecting to servers using Kerberos + // authentication. So, we'll just VLOG this here. If we try to connect + // to a server which requires Kerberos, we'll get a negotiation error + // at that point. + if (VLOG_IS_ON(1)) { + KLOG_FIRST_N(INFO, 1) << "Couldn't enable GSSAPI (Kerberos) SASL plugin: " + << s.message().ToString() + << ". This process will be unable to connect to " + << "servers requiring Kerberos authentication."; + } + + if (authentication == RpcAuthentication::REQUIRED && + !authn_token && + !messenger->tls_context().has_signed_cert()) { + return Status::InvalidArgument( + "Kerberos, token, or PKI certificate credentials must be provided in order to " + "require authentication for a client"); + } + } + } + + if (authentication != RpcAuthentication::REQUIRED) { + RETURN_NOT_OK(client_negotiation.EnablePlain(conn->local_user_credentials().real_user(), "")); + } + + client_negotiation.set_deadline(deadline); + + RETURN_NOT_OK(WaitForClientConnect(client_negotiation.socket(), deadline)); + RETURN_NOT_OK(client_negotiation.socket()->SetNonBlocking(false)); + RETURN_NOT_OK(client_negotiation.Negotiate(rpc_error)); + RETURN_NOT_OK(DisableSocketTimeouts(client_negotiation.socket())); + + // Transfer the negotiated socket and state back to the connection. + conn->adopt_socket(client_negotiation.release_socket()); + conn->set_remote_features(client_negotiation.take_server_features()); + + // Sanity check: if no authn token was supplied as user credentials, + // the negotiated authentication type cannot be AuthenticationType::TOKEN. + DCHECK(!(authn_token == boost::none && + client_negotiation.negotiated_authn() == AuthenticationType::TOKEN)); + + return Status::OK(); +} + +// Perform server negotiation. We don't LOG() anything, we leave that to our caller. +static Status DoServerNegotiation(Connection* conn, + RpcAuthentication authentication, + RpcEncryption encryption, + const MonoTime& deadline) { + if (authentication == RpcAuthentication::REQUIRED && + FLAGS_keytab_file.empty() && + FLAGS_rpc_certificate_file.empty()) { + return Status::InvalidArgument("RPC authentication (--rpc_authentication) may not be " + "required unless Kerberos (--keytab_file) or external PKI " + "(--rpc_certificate_file et al) are configured"); + } + + if (FLAGS_rpc_negotiation_inject_delay_ms > 0) { + LOG(WARNING) << "Injecting " << FLAGS_rpc_negotiation_inject_delay_ms + << "ms delay in negotiation"; + SleepFor(MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_inject_delay_ms)); + } + + // Create a new ServerNegotiation to handle the synchronous negotiation. + const auto* messenger = conn->reactor_thread()->reactor()->messenger(); + ServerNegotiation server_negotiation(conn->release_socket(), + &messenger->tls_context(), + &messenger->token_verifier(), + encryption); + + if (authentication != RpcAuthentication::DISABLED && !FLAGS_keytab_file.empty()) { + RETURN_NOT_OK(server_negotiation.EnableGSSAPI()); + } + if (authentication != RpcAuthentication::REQUIRED) { + RETURN_NOT_OK(server_negotiation.EnablePlain()); + } + + server_negotiation.set_deadline(deadline); + + RETURN_NOT_OK(server_negotiation.socket()->SetNonBlocking(false)); + + RETURN_NOT_OK(server_negotiation.Negotiate()); + RETURN_NOT_OK(DisableSocketTimeouts(server_negotiation.socket())); + + // Transfer the negotiated socket and state back to the connection. + conn->adopt_socket(server_negotiation.release_socket()); + conn->set_remote_features(server_negotiation.take_client_features()); + conn->set_remote_user(server_negotiation.take_authenticated_user()); + + return Status::OK(); +} + +void Negotiation::RunNegotiation(const scoped_refptr<Connection>& conn, + RpcAuthentication authentication, + RpcEncryption encryption, + MonoTime deadline) { + Status s; + unique_ptr<ErrorStatusPB> rpc_error; + if (conn->direction() == Connection::SERVER) { + s = DoServerNegotiation(conn.get(), authentication, encryption, deadline); + } else { + s = DoClientNegotiation(conn.get(), authentication, encryption, deadline, + &rpc_error); + } + + if (PREDICT_FALSE(!s.ok())) { + string msg = Substitute("$0 connection negotiation failed: $1", + conn->direction() == Connection::SERVER ? "Server" : "Client", + conn->ToString()); + s = s.CloneAndPrepend(msg); + } + TRACE("Negotiation complete: $0", s.ToString()); + + bool is_bad = !s.ok() && !( + (s.IsNetworkError() && s.posix_code() == ECONNREFUSED) || + s.IsNotAuthorized()); + + if (is_bad || FLAGS_rpc_trace_negotiation) { + string msg = Trace::CurrentTrace()->DumpToString(); + if (is_bad) { + LOG(WARNING) << "Failed RPC negotiation. Trace:\n" << msg; + } else { + LOG(INFO) << "RPC negotiation tracing enabled. Trace:\n" << msg; + } + } + + if (conn->direction() == Connection::SERVER && s.IsNotAuthorized()) { + LOG(WARNING) << "Unauthorized connection attempt: " << s.message().ToString(); + } + conn->CompleteNegotiation(std::move(s), std::move(rpc_error)); +} + + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/negotiation.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/negotiation.h b/be/src/kudu/rpc/negotiation.h new file mode 100644 index 0000000..2ca459b --- /dev/null +++ b/be/src/kudu/rpc/negotiation.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_NEGOTIATION_H +#define KUDU_RPC_NEGOTIATION_H + +#include <iosfwd> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/monotime.h" + +namespace kudu { +namespace rpc { + +class Connection; +enum class RpcAuthentication; +enum class RpcEncryption; + +enum class AuthenticationType { + INVALID, + SASL, + TOKEN, + CERTIFICATE, +}; +const char* AuthenticationTypeToString(AuthenticationType t); + +std::ostream& operator<<(std::ostream& o, AuthenticationType authentication_type); + +class Negotiation { + public: + + // Perform negotiation for a connection (either server or client) + static void RunNegotiation(const scoped_refptr<Connection>& conn, + RpcAuthentication authentication, + RpcEncryption encryption, + MonoTime deadline); + private: + DISALLOW_IMPLICIT_CONSTRUCTORS(Negotiation); +}; + +} // namespace rpc +} // namespace kudu +#endif // KUDU_RPC_NEGOTIATION_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/outbound_call.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/outbound_call.cc b/be/src/kudu/rpc/outbound_call.cc new file mode 100644 index 0000000..af03f1c --- /dev/null +++ b/be/src/kudu/rpc/outbound_call.cc @@ -0,0 +1,509 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <algorithm> +#include <boost/functional/hash.hpp> +#include <gflags/gflags.h> +#include <memory> +#include <mutex> +#include <string> +#include <unordered_set> +#include <vector> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" +#include "kudu/rpc/outbound_call.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/rpc_introspection.pb.h" +#include "kudu/rpc/rpc_sidecar.h" +#include "kudu/rpc/serialization.h" +#include "kudu/rpc/transfer.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/kernel_stack_watchdog.h" + +// 100M cycles should be about 50ms on a 2Ghz box. This should be high +// enough that involuntary context switches don't trigger it, but low enough +// that any serious blocking behavior on the reactor would. +DEFINE_int64(rpc_callback_max_cycles, 100 * 1000 * 1000, + "The maximum number of cycles for which an RPC callback " + "should be allowed to run without emitting a warning." + " (Advanced debugging option)"); +TAG_FLAG(rpc_callback_max_cycles, advanced); +TAG_FLAG(rpc_callback_max_cycles, runtime); + +using std::unique_ptr; + +namespace kudu { +namespace rpc { + +using google::protobuf::Message; +using strings::Substitute; + +static const double kMicrosPerSecond = 1000000.0; + +/// +/// OutboundCall +/// + +OutboundCall::OutboundCall(const ConnectionId& conn_id, + const RemoteMethod& remote_method, + google::protobuf::Message* response_storage, + RpcController* controller, + ResponseCallback callback) + : state_(READY), + remote_method_(remote_method), + conn_id_(conn_id), + callback_(std::move(callback)), + controller_(DCHECK_NOTNULL(controller)), + response_(DCHECK_NOTNULL(response_storage)) { + DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_) + << " and RPC timeout: " + << (controller->timeout().Initialized() ? controller->timeout().ToString() : "none"); + header_.set_call_id(kInvalidCallId); + remote_method.ToPB(header_.mutable_remote_method()); + start_time_ = MonoTime::Now(); + + if (!controller_->required_server_features().empty()) { + required_rpc_features_.insert(RpcFeatureFlag::APPLICATION_FEATURE_FLAGS); + } + + if (controller_->request_id_) { + header_.set_allocated_request_id(controller_->request_id_.release()); + } +} + +OutboundCall::~OutboundCall() { + DCHECK(IsFinished()); + DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_); +} + +Status OutboundCall::SerializeTo(vector<Slice>* slices) { + DCHECK_LT(0, request_buf_.size()) + << "Must call SetRequestPayload() before SerializeTo()"; + + const MonoDelta &timeout = controller_->timeout(); + if (timeout.Initialized()) { + header_.set_timeout_millis(timeout.ToMilliseconds()); + } + + for (uint32_t feature : controller_->required_server_features()) { + header_.add_required_feature_flags(feature); + } + + DCHECK_LE(0, sidecar_byte_size_); + serialization::SerializeHeader( + header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_); + + slices->push_back(Slice(header_buf_)); + slices->push_back(Slice(request_buf_)); + for (const unique_ptr<RpcSidecar>& car : sidecars_) slices->push_back(car->AsSlice()); + return Status::OK(); +} + +void OutboundCall::SetRequestPayload(const Message& req, + vector<unique_ptr<RpcSidecar>>&& sidecars) { + DCHECK_EQ(-1, sidecar_byte_size_); + + sidecars_ = move(sidecars); + + // Compute total size of sidecar payload so that extra space can be reserved as part of + // the request body. + uint32_t message_size = req.ByteSize(); + sidecar_byte_size_ = 0; + for (const unique_ptr<RpcSidecar>& car: sidecars_) { + header_.add_sidecar_offsets(sidecar_byte_size_ + message_size); + sidecar_byte_size_ += car->AsSlice().size(); + } + + serialization::SerializeMessage(req, &request_buf_, sidecar_byte_size_, true); +} + +Status OutboundCall::status() const { + std::lock_guard<simple_spinlock> l(lock_); + return status_; +} + +const ErrorStatusPB* OutboundCall::error_pb() const { + std::lock_guard<simple_spinlock> l(lock_); + return error_pb_.get(); +} + +string OutboundCall::StateName(State state) { + switch (state) { + case READY: + return "READY"; + case ON_OUTBOUND_QUEUE: + return "ON_OUTBOUND_QUEUE"; + case SENDING: + return "SENDING"; + case SENT: + return "SENT"; + case NEGOTIATION_TIMED_OUT: + return "NEGOTIATION_TIMED_OUT"; + case TIMED_OUT: + return "TIMED_OUT"; + case FINISHED_NEGOTIATION_ERROR: + return "FINISHED_NEGOTIATION_ERROR"; + case FINISHED_ERROR: + return "FINISHED_ERROR"; + case FINISHED_SUCCESS: + return "FINISHED_SUCCESS"; + default: + LOG(DFATAL) << "Unknown state in OutboundCall: " << state; + return StringPrintf("UNKNOWN(%d)", state); + } +} + +void OutboundCall::set_state(State new_state) { + std::lock_guard<simple_spinlock> l(lock_); + set_state_unlocked(new_state); +} + +OutboundCall::State OutboundCall::state() const { + std::lock_guard<simple_spinlock> l(lock_); + return state_; +} + +void OutboundCall::set_state_unlocked(State new_state) { + // Sanity check state transitions. + DVLOG(3) << "OutboundCall " << this << " (" << ToString() << ") switching from " << + StateName(state_) << " to " << StateName(new_state); + switch (new_state) { + case ON_OUTBOUND_QUEUE: + DCHECK_EQ(state_, READY); + break; + case SENDING: + // Allow SENDING to be set idempotently so we don't have to specifically check + // whether the state is transitioning in the RPC code. + DCHECK(state_ == ON_OUTBOUND_QUEUE || state_ == SENDING); + break; + case SENT: + DCHECK_EQ(state_, SENDING); + break; + case NEGOTIATION_TIMED_OUT: + DCHECK(state_ == ON_OUTBOUND_QUEUE); + break; + case TIMED_OUT: + DCHECK(state_ == SENT || state_ == ON_OUTBOUND_QUEUE || state_ == SENDING); + break; + case FINISHED_SUCCESS: + DCHECK_EQ(state_, SENT); + break; + default: + // No sanity checks for others. + break; + } + + state_ = new_state; +} + +void OutboundCall::CallCallback() { + int64_t start_cycles = CycleClock::Now(); + { + SCOPED_WATCH_STACK(100); + callback_(); + // Clear the callback, since it may be holding onto reference counts + // via bound parameters. We do this inside the timer because it's possible + // the user has naughty destructors that block, and we want to account for that + // time here if they happen to run on this thread. + callback_ = NULL; + } + int64_t end_cycles = CycleClock::Now(); + int64_t wait_cycles = end_cycles - start_cycles; + if (PREDICT_FALSE(wait_cycles > FLAGS_rpc_callback_max_cycles)) { + double micros = static_cast<double>(wait_cycles) / base::CyclesPerSecond() + * kMicrosPerSecond; + + LOG(WARNING) << "RPC callback for " << ToString() << " blocked reactor thread for " + << micros << "us"; + } +} + +void OutboundCall::SetResponse(gscoped_ptr<CallResponse> resp) { + call_response_ = std::move(resp); + Slice r(call_response_->serialized_response()); + + if (call_response_->is_success()) { + // TODO: here we're deserializing the call response within the reactor thread, + // which isn't great, since it would block processing of other RPCs in parallel. + // Should look into a way to avoid this. + if (!response_->ParseFromArray(r.data(), r.size())) { + SetFailed(Status::IOError("invalid RPC response, missing fields", + response_->InitializationErrorString())); + return; + } + set_state(FINISHED_SUCCESS); + CallCallback(); + } else { + // Error + gscoped_ptr<ErrorStatusPB> err(new ErrorStatusPB()); + if (!err->ParseFromArray(r.data(), r.size())) { + SetFailed(Status::IOError("Was an RPC error but could not parse error response", + err->InitializationErrorString())); + return; + } + ErrorStatusPB* err_raw = err.release(); + SetFailed(Status::RemoteError(err_raw->message()), Phase::REMOTE_CALL, err_raw); + } +} + +void OutboundCall::SetQueued() { + set_state(ON_OUTBOUND_QUEUE); +} + +void OutboundCall::SetSending() { + set_state(SENDING); +} + +void OutboundCall::SetSent() { + set_state(SENT); + + // This method is called in the reactor thread, so free the header buf, + // which was also allocated from this thread. tcmalloc's thread caching + // behavior is a lot more efficient if memory is freed from the same thread + // which allocated it -- this lets it keep to thread-local operations instead + // of taking a mutex to put memory back on the global freelist. + delete [] header_buf_.release(); + + // request_buf_ is also done being used here, but since it was allocated by + // the caller thread, we would rather let that thread free it whenever it + // deletes the RpcController. +} + +void OutboundCall::SetFailed(const Status &status, + Phase phase, + ErrorStatusPB* err_pb) { + DCHECK(!status.ok()); + DCHECK(phase == Phase::CONNECTION_NEGOTIATION || phase == Phase::REMOTE_CALL); + { + std::lock_guard<simple_spinlock> l(lock_); + status_ = status; + if (err_pb) { + error_pb_.reset(err_pb); + } + set_state_unlocked(phase == Phase::CONNECTION_NEGOTIATION + ? FINISHED_NEGOTIATION_ERROR + : FINISHED_ERROR); + } + CallCallback(); +} + +void OutboundCall::SetTimedOut(Phase phase) { + static const char* kErrMsgNegotiation = + "connection negotiation to $1 for RPC $0 timed out after $2 ($3)"; + static const char* kErrMsgCall = "$0 RPC to $1 timed out after $2 ($3)"; + DCHECK(phase == Phase::CONNECTION_NEGOTIATION || phase == Phase::REMOTE_CALL); + + // We have to fetch timeout outside the lock to avoid a lock + // order inversion between this class and RpcController. + const MonoDelta timeout = controller_->timeout(); + { + std::lock_guard<simple_spinlock> l(lock_); + status_ = Status::TimedOut( + Substitute((phase == Phase::REMOTE_CALL) ? kErrMsgCall : kErrMsgNegotiation, + remote_method_.method_name(), + conn_id_.remote().ToString(), + timeout.ToString(), + StateName(state_))); + set_state_unlocked((phase == Phase::REMOTE_CALL) ? TIMED_OUT : NEGOTIATION_TIMED_OUT); + } + CallCallback(); +} + +bool OutboundCall::IsTimedOut() const { + std::lock_guard<simple_spinlock> l(lock_); + switch (state_) { + case NEGOTIATION_TIMED_OUT: // fall-through + case TIMED_OUT: + return true; + default: + return false; + } +} + +bool OutboundCall::IsNegotiationError() const { + std::lock_guard<simple_spinlock> l(lock_); + switch (state_) { + case FINISHED_NEGOTIATION_ERROR: // fall-through + case NEGOTIATION_TIMED_OUT: + return true; + default: + return false; + } +} + +bool OutboundCall::IsFinished() const { + std::lock_guard<simple_spinlock> l(lock_); + switch (state_) { + case READY: + case SENDING: + case ON_OUTBOUND_QUEUE: + case SENT: + return false; + case NEGOTIATION_TIMED_OUT: + case TIMED_OUT: + case FINISHED_NEGOTIATION_ERROR: + case FINISHED_ERROR: + case FINISHED_SUCCESS: + return true; + default: + LOG(FATAL) << "Unknown call state: " << state_; + return false; + } +} + +string OutboundCall::ToString() const { + return Substitute("RPC call $0 -> $1", remote_method_.ToString(), conn_id_.ToString()); +} + +void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req, + RpcCallInProgressPB* resp) { + std::lock_guard<simple_spinlock> l(lock_); + resp->mutable_header()->CopyFrom(header_); + resp->set_micros_elapsed((MonoTime::Now() - start_time_).ToMicroseconds()); + + switch (state_) { + case READY: + // Don't bother setting a state for "READY" since we don't expose a call + // until it's at least on the queue of a connection. + break; + case ON_OUTBOUND_QUEUE: + resp->set_state(RpcCallInProgressPB::ON_OUTBOUND_QUEUE); + break; + case SENDING: + resp->set_state(RpcCallInProgressPB::SENDING); + break; + case SENT: + resp->set_state(RpcCallInProgressPB::SENT); + break; + case NEGOTIATION_TIMED_OUT: + resp->set_state(RpcCallInProgressPB::NEGOTIATION_TIMED_OUT); + break; + case TIMED_OUT: + resp->set_state(RpcCallInProgressPB::TIMED_OUT); + break; + case FINISHED_NEGOTIATION_ERROR: + resp->set_state(RpcCallInProgressPB::FINISHED_NEGOTIATION_ERROR); + break; + case FINISHED_ERROR: + resp->set_state(RpcCallInProgressPB::FINISHED_ERROR); + break; + case FINISHED_SUCCESS: + resp->set_state(RpcCallInProgressPB::FINISHED_SUCCESS); + break; + } +} + +/// +/// ConnectionId +/// + +ConnectionId::ConnectionId() {} + +ConnectionId::ConnectionId(const ConnectionId& other) { + DoCopyFrom(other); +} + +ConnectionId::ConnectionId(const Sockaddr& remote, UserCredentials user_credentials) { + remote_ = remote; + user_credentials_ = std::move(user_credentials); +} + +void ConnectionId::set_remote(const Sockaddr& remote) { + remote_ = remote; +} + +void ConnectionId::set_user_credentials(UserCredentials user_credentials) { + user_credentials_ = std::move(user_credentials); +} + +void ConnectionId::CopyFrom(const ConnectionId& other) { + DoCopyFrom(other); +} + +string ConnectionId::ToString() const { + // Does not print the password. + return StringPrintf("{remote=%s, user_credentials=%s}", + remote_.ToString().c_str(), + user_credentials_.ToString().c_str()); +} + +void ConnectionId::DoCopyFrom(const ConnectionId& other) { + remote_ = other.remote_; + user_credentials_ = other.user_credentials_; +} + +size_t ConnectionId::HashCode() const { + size_t seed = 0; + boost::hash_combine(seed, remote_.HashCode()); + boost::hash_combine(seed, user_credentials_.HashCode()); + return seed; +} + +bool ConnectionId::Equals(const ConnectionId& other) const { + return (remote() == other.remote() + && user_credentials().Equals(other.user_credentials())); +} + +size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const { + return conn_id.HashCode(); +} + +bool ConnectionIdEqual::operator() (const ConnectionId& cid1, const ConnectionId& cid2) const { + return cid1.Equals(cid2); +} + +/// +/// CallResponse +/// + +CallResponse::CallResponse() + : parsed_(false) { +} + +Status CallResponse::GetSidecar(int idx, Slice* sidecar) const { + DCHECK(parsed_); + if (idx < 0 || idx >= header_.sidecar_offsets_size()) { + return Status::InvalidArgument(strings::Substitute( + "Index $0 does not reference a valid sidecar", idx)); + } + *sidecar = sidecar_slices_[idx]; + return Status::OK(); +} + +Status CallResponse::ParseFrom(gscoped_ptr<InboundTransfer> transfer) { + CHECK(!parsed_); + RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_, + &serialized_response_)); + + // Use information from header to extract the payload slices. + RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(), + serialized_response_, sidecar_slices_)); + + if (header_.sidecar_offsets_size() > 0) { + serialized_response_ = + Slice(serialized_response_.data(), header_.sidecar_offsets(0)); + } + + transfer_.swap(transfer); + parsed_ = true; + return Status::OK(); +} + +} // namespace rpc +} // namespace kudu
