Repository: kudu Updated Branches: refs/heads/master 038c6bc05 -> 6ce61d6e6
KUDU-1918 Prevent hijacking of scanner IDs This makes the scanner remember its RemoteUser, and ensures that when continuing a scan, the new requestor matches the original requestor. This prevents one user from somehow obtaining a scanner ID from another and then "hijacking" the in-progress scan. This restricts scans, checksum scans, and keep-alive requests. Change-Id: Ic91fa0ca471bd674e35aa2f8de3806b88ad4b3b4 Reviewed-on: http://gerrit.cloudera.org:8080/6348 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e172df40 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e172df40 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e172df40 Branch: refs/heads/master Commit: e172df405aef47b1339c9879b835baf69b539f8c Parents: 038c6bc Author: Andrew Wong <[email protected]> Authored: Thu Nov 1 10:49:41 2018 -0700 Committer: Andrew Wong <[email protected]> Committed: Tue Nov 6 20:55:31 2018 +0000 ---------------------------------------------------------------------- src/kudu/client/client-test.cc | 65 +++++++++++++++++ src/kudu/client/client.h | 1 + src/kudu/client/scanner-internal.cc | 3 + src/kudu/client/scanner-internal.h | 3 + src/kudu/tserver/scanners-test.cc | 25 ++++--- src/kudu/tserver/scanners.cc | 36 +++++++--- src/kudu/tserver/scanners.h | 39 +++++++---- src/kudu/tserver/tablet_server-test-base.cc | 12 ++-- src/kudu/tserver/tablet_server-test-base.h | 4 ++ src/kudu/tserver/tablet_server-test.cc | 88 +++++++++++++++++++++++- src/kudu/tserver/tablet_service.cc | 46 +++++++++---- src/kudu/tserver/tserver.proto | 3 + src/kudu/tserver/tserver_path_handlers.cc | 3 +- 13 files changed, 272 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/client-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index efcdd68..10a4c6c 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -5714,5 +5714,70 @@ TEST_F(ClientTest, TestAuthenticationCredentialsRealUser) { KuduScanner scanner(client_table_.get()); ASSERT_OK(ScanToStrings(&scanner, &rows)); } + +// Test that clients that aren't authenticated as the appropriate user will be +// unable to hijack a specific scanner ID. +TEST_F(ClientTest, TestBlockScannerHijackingAttempts) { + const string kUser = "token-user"; + const string kBadGuy = "bad-guy"; + const string table_name = client_table_->name(); + FLAGS_user_acl = Substitute("$0,$1", kUser, kBadGuy); + cluster_->ShutdownNodes(cluster::ClusterNodes::ALL); + ASSERT_OK(cluster_->StartSync()); + + // Insert some rows to the table. + NO_FATALS(InsertTestRows(client_table_.get(), FLAGS_test_scan_num_rows)); + + // First authenticate as a user and create a scanner for the existing table. + const auto get_table_as_user = [&] (const string& user, shared_ptr<KuduTable>* table) { + KuduClientBuilder client_builder; + string authn_creds; + AuthenticationCredentialsPB pb; + pb.set_real_user(user); + ASSERT_TRUE(pb.SerializeToString(&authn_creds)); + client_builder.import_authentication_credentials(authn_creds); + + // Create the client and table for the user. + shared_ptr<KuduClient> user_client; + ASSERT_OK(cluster_->CreateClient(&client_builder, &user_client)); + ASSERT_OK(user_client->OpenTable(table_name, table)); + }; + + shared_ptr<KuduTable> user_table; + shared_ptr<KuduTable> bad_guy_table; + NO_FATALS(get_table_as_user(kUser, &user_table)); + NO_FATALS(get_table_as_user(kBadGuy, &bad_guy_table)); + + // Test both fault-tolerant scanners and non-fault-tolerant scanners. + for (bool fault_tolerance : { true, false }) { + // Scan the table as the user to get a scanner ID, and set up a malicious + // scanner that will try to hijack that scanner ID. Set an initial batch + // size of 0 so the calls to Open() don't buffer any rows. + KuduScanner user_scanner(user_table.get()); + ASSERT_OK(user_scanner.SetBatchSizeBytes(0)); + KuduScanner bad_guy_scanner(bad_guy_table.get()); + ASSERT_OK(bad_guy_scanner.SetBatchSizeBytes(0)); + if (fault_tolerance) { + ASSERT_OK(user_scanner.SetFaultTolerant()); + ASSERT_OK(bad_guy_scanner.SetFaultTolerant()); + } + ASSERT_OK(user_scanner.Open()); + ASSERT_OK(bad_guy_scanner.Open()); + const string scanner_id = user_scanner.data_->next_req_.scanner_id(); + ASSERT_FALSE(scanner_id.empty()); + + // Now attempt to get that scanner id as a different user. + LOG(INFO) << Substitute("Attempting to extract data from $0 scan $1 as $2", + fault_tolerance ? "fault-tolerant" : "non-fault-tolerant", scanner_id, kBadGuy); + bad_guy_scanner.data_->next_req_.set_scanner_id(scanner_id); + bad_guy_scanner.data_->last_response_.set_has_more_results(true); + KuduScanBatch batch; + Status s = bad_guy_scanner.NextBatch(&batch); + ASSERT_TRUE(s.IsRemoteError()); + ASSERT_STR_CONTAINS(s.ToString(), "Not authorized"); + ASSERT_EQ(0, batch.NumRows()); + } +} + } // namespace client } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/client.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 56c3f50..626df7d 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -2224,6 +2224,7 @@ class KUDU_EXPORT KuduScanner { class KUDU_NO_EXPORT Data; friend class KuduScanToken; + FRIEND_TEST(ClientTest, TestBlockScannerHijackingAttempts); FRIEND_TEST(ClientTest, TestScanCloseProxy); FRIEND_TEST(ClientTest, TestScanFaultTolerance); FRIEND_TEST(ClientTest, TestScanNoBlockCaching); http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/scanner-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc index 9ac1a86..d7c30bb 100644 --- a/src/kudu/client/scanner-internal.cc +++ b/src/kudu/client/scanner-internal.cc @@ -243,6 +243,9 @@ ScanRpcStatus KuduScanner::Data::AnalyzeResponse(const Status& rpc_status, case rpc::ErrorStatusPB::ERROR_UNAVAILABLE: return ScanRpcStatus{ ScanRpcStatus::SERVICE_UNAVAILABLE, rpc_status}; + case rpc::ErrorStatusPB::FATAL_UNAUTHORIZED: + return ScanRpcStatus{ + ScanRpcStatus::SCAN_NOT_AUTHORIZED, rpc_status}; default: return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status}; } http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/scanner-internal.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h index 0dddb40..5e7652b 100644 --- a/src/kudu/client/scanner-internal.h +++ b/src/kudu/client/scanner-internal.h @@ -89,6 +89,9 @@ struct ScanRpcStatus { // the token has expired. RPC_INVALID_AUTHENTICATION_TOKEN, + // The requestor was not authorized to make the request. + SCAN_NOT_AUTHORIZED, + // Another RPC-system error (e.g. NetworkError because the TS was down). RPC_ERROR, http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/scanners-test.cc b/src/kudu/tserver/scanners-test.cc index 7a2084e..e2456c4 100644 --- a/src/kudu/tserver/scanners-test.cc +++ b/src/kudu/tserver/scanners-test.cc @@ -24,42 +24,51 @@ #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/ref_counted.h" +#include "kudu/rpc/remote_user.h" #include "kudu/tablet/tablet_replica.h" #include "kudu/tserver/scanner_metrics.h" #include "kudu/tserver/tserver.pb.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" DECLARE_int32(scanner_ttl_ms); namespace kudu { +using rpc::RemoteUser; +using std::vector; using tablet::TabletReplica; namespace tserver { -using std::vector; +static const char* kUsername = "kudu-user"; TEST(ScannersTest, TestManager) { scoped_refptr<TabletReplica> null_replica(nullptr); ScannerManager mgr(nullptr); // Create two scanners, make sure their ids are different. + RemoteUser user; + user.SetUnauthenticated(kUsername); SharedScanner s1, s2; - mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s1); - mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s2); + mgr.NewScanner(null_replica, user, RowFormatFlags::NO_FLAGS, &s1); + mgr.NewScanner(null_replica, user, RowFormatFlags::NO_FLAGS, &s2); ASSERT_NE(s1->id(), s2->id()); // Check that they're both registered. SharedScanner result; - ASSERT_TRUE(mgr.LookupScanner(s1->id(), &result)); + TabletServerErrorPB::Code error_code; + ASSERT_OK(mgr.LookupScanner(s1->id(), kUsername, &error_code, &result)); ASSERT_EQ(result.get(), s1.get()); - ASSERT_TRUE(mgr.LookupScanner(s2->id(), &result)); + ASSERT_OK(mgr.LookupScanner(s2->id(), kUsername, &error_code, &result)); ASSERT_EQ(result.get(), s2.get()); // Check that looking up a bad scanner returns false. - ASSERT_FALSE(mgr.LookupScanner("xxx", &result)); + ASSERT_TRUE(mgr.LookupScanner("xxx", kUsername, &error_code, &result).IsNotFound()); + ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, error_code); // Remove the scanners. ASSERT_TRUE(mgr.UnregisterScanner(s1->id())); @@ -75,8 +84,8 @@ TEST(ScannerTest, TestExpire) { MetricRegistry registry; ScannerManager mgr(METRIC_ENTITY_server.Instantiate(®istry, "test")); SharedScanner s1, s2; - mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s1); - mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s2); + mgr.NewScanner(null_replica, RemoteUser(), RowFormatFlags::NO_FLAGS, &s1); + mgr.NewScanner(null_replica, RemoteUser(), RowFormatFlags::NO_FLAGS, &s2); SleepFor(MonoDelta::FromMilliseconds(200)); s2->UpdateAccessTime(); mgr.RemoveExpiredScanners(); http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc index 6fb9d77..60f69fb 100644 --- a/src/kudu/tserver/scanners.cc +++ b/src/kudu/tserver/scanners.cc @@ -34,6 +34,7 @@ #include "kudu/gutil/map-util.h" #include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/remote_user.h" #include "kudu/tablet/tablet.h" #include "kudu/tablet/tablet_metadata.h" #include "kudu/tablet/tablet_metrics.h" @@ -68,6 +69,7 @@ using strings::Substitute; namespace kudu { +using rpc::RemoteUser; using tablet::TabletReplica; namespace tserver { @@ -131,20 +133,16 @@ ScannerManager::ScannerMapStripe& ScannerManager::GetStripeByScannerId(const str } void ScannerManager::NewScanner(const scoped_refptr<TabletReplica>& tablet_replica, - const std::string& requestor_string, + const RemoteUser& remote_user, uint64_t row_format_flags, SharedScanner* scanner) { // Keep trying to generate a unique ID until we get one. bool success = false; while (!success) { - // TODO(KUDU-1918): are these UUIDs predictable? If so, we should - // probably generate random numbers instead, since we can safely - // just retry until we avoid a collision. Alternatively we could - // verify that the requestor userid does not change mid-scan. string id = oid_generator_.Next(); scanner->reset(new Scanner(id, tablet_replica, - requestor_string, + remote_user, metrics_.get(), row_format_flags)); @@ -154,10 +152,26 @@ void ScannerManager::NewScanner(const scoped_refptr<TabletReplica>& tablet_repli } } -bool ScannerManager::LookupScanner(const string& scanner_id, SharedScanner* scanner) { +Status ScannerManager::LookupScanner(const string& scanner_id, + const string& username, + TabletServerErrorPB::Code* error_code, + SharedScanner* scanner) { + SharedScanner ret; ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id); shared_lock<RWMutex> l(stripe.lock_); - return FindCopy(stripe.scanners_by_id_, scanner_id, scanner); + bool found_scanner = FindCopy(stripe.scanners_by_id_, scanner_id, &ret); + if (!found_scanner) { + *error_code = TabletServerErrorPB::SCANNER_EXPIRED; + return Status::NotFound(Substitute("Scanner $0 not found (it may have expired)", + scanner_id)); + } + if (username != ret->remote_user().username()) { + *error_code = TabletServerErrorPB::NOT_AUTHORIZED; + return Status::NotAuthorized(Substitute("User $0 doesn't own scanner $1", + username, scanner_id)); + } + *scanner = std::move(ret); + return Status::OK(); } bool ScannerManager::UnregisterScanner(const string& scanner_id) { @@ -290,11 +304,11 @@ void ScannerManager::RecordCompletedScanUnlocked(ScanDescriptor descriptor) { const std::string Scanner::kNullTabletId = "null tablet"; Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica, - string requestor_string, ScannerMetrics* metrics, + RemoteUser remote_user, ScannerMetrics* metrics, uint64_t row_format_flags) : id_(std::move(id)), tablet_replica_(tablet_replica), - requestor_string_(std::move(requestor_string)), + remote_user_(std::move(remote_user)), call_seq_id_(0), start_time_(MonoTime::Now()), metrics_(metrics), @@ -353,7 +367,7 @@ ScanDescriptor Scanner::descriptor() const { ScanDescriptor descriptor; descriptor.tablet_id = tablet_id(); descriptor.scanner_id = id(); - descriptor.requestor = requestor_string(); + descriptor.remote_user = remote_user(); descriptor.start_time = start_time_; for (const auto& column : client_projection_schema()->columns()) { http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners.h ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/scanners.h b/src/kudu/tserver/scanners.h index 9588053..354ec90 100644 --- a/src/kudu/tserver/scanners.h +++ b/src/kudu/tserver/scanners.h @@ -14,8 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#ifndef KUDU_TSERVER_SCANNERS_H -#define KUDU_TSERVER_SCANNERS_H +#pragma once #include <cstddef> #include <cstdint> @@ -34,7 +33,9 @@ #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" +#include "kudu/rpc/remote_user.h" #include "kudu/tablet/tablet_replica.h" +#include "kudu/tserver/tserver.pb.h" #include "kudu/util/auto_release_pool.h" #include "kudu/util/condition_variable.h" #include "kudu/util/locks.h" @@ -55,9 +56,11 @@ class Thread; namespace tserver { class Scanner; + enum class ScanState; struct ScanDescriptor; struct ScannerMetrics; + typedef std::shared_ptr<Scanner> SharedScanner; // Manages the live scanners within a Tablet Server. @@ -76,15 +79,22 @@ class ScannerManager { // Starts the expired scanner removal thread. Status StartRemovalThread(); - // Create a new scanner with a unique ID, inserting it into the map. + // Create a new scanner with a unique ID, inserting it into the map. Further + // lookups for the scanner must provide the username associated with + // 'remote_user'. void NewScanner(const scoped_refptr<tablet::TabletReplica>& tablet_replica, - const std::string& requestor_string, + const rpc::RemoteUser& remote_user, uint64_t row_format_flags, SharedScanner* scanner); - // Lookup the given scanner by its ID. - // Returns true if the scanner is found successfully. - bool LookupScanner(const std::string& scanner_id, SharedScanner* scanner); + // Lookup the given scanner by its ID with the provided username, setting an + // appropriate error code. + // Returns NotFound if the scanner doesn't exist, or NotAuthorized if the + // scanner wasn't created by 'username'. + Status LookupScanner(const std::string& scanner_id, + const std::string& username, + TabletServerErrorPB::Code* error_code, + SharedScanner* scanner); // Unregister the given scanner by its ID. // Returns true if unregistered successfully. @@ -185,7 +195,7 @@ class Scanner { public: Scanner(std::string id, const scoped_refptr<tablet::TabletReplica>& tablet_replica, - std::string requestor_string, ScannerMetrics* metrics, + rpc::RemoteUser remote_user, ScannerMetrics* metrics, uint64_t row_format_flags); ~Scanner(); @@ -238,7 +248,7 @@ class Scanner { const scoped_refptr<tablet::TabletReplica>& tablet_replica() const { return tablet_replica_; } - const std::string& requestor_string() const { return requestor_string_; } + const rpc::RemoteUser& remote_user() const { return remote_user_; } // Returns the current call sequence ID of the scanner. uint32_t call_seq_id() const { @@ -322,9 +332,9 @@ class Scanner { // Tablet associated with the scanner. const scoped_refptr<tablet::TabletReplica> tablet_replica_; - // Information about the requestor. Populated from - // RpcContext::requestor_string(). - const std::string requestor_string_; + // The remote user making the request. Populated from the RemoteUser of the + // first request. + const rpc::RemoteUser remote_user_; // The last time that the scanner was accessed. MonoTime last_access_time_; @@ -392,8 +402,8 @@ struct ScanDescriptor { // The scanner ID. std::string scanner_id; - // The scan requestor. - std::string requestor; + // The user that made the first request. + rpc::RemoteUser remote_user; // The table name. std::string table_name; @@ -415,4 +425,3 @@ struct ScanDescriptor { } // namespace tserver } // namespace kudu -#endif http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test-base.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc index 924bf5e..59b26e9 100644 --- a/src/kudu/tserver/tablet_server-test-base.cc +++ b/src/kudu/tserver/tablet_server-test-base.cc @@ -458,6 +458,12 @@ void TabletServerTestBase::VerifyScanRequestFailure( } } +Status TabletServerTestBase::FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const { + scan->set_tablet_id(kTabletId); + scan->set_read_mode(read_mode); + return SchemaToColumnPBs(schema_, scan->mutable_projected_columns()); +} + // Open a new scanner which scans all of the columns in the table. void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp, ReadMode read_mode) { @@ -465,11 +471,7 @@ void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp, RpcController rpc; // Set up a new request with no predicates, all columns. - const Schema& projection = schema_; - NewScanRequestPB* scan = req.mutable_new_scan_request(); - scan->set_tablet_id(kTabletId); - scan->set_read_mode(read_mode); - ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); + ASSERT_OK(FillNewScanRequest(read_mode, req.mutable_new_scan_request())); req.set_call_seq_id(0); req.set_batch_size_bytes(0); // so it won't return data right away http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test-base.h ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h index 9841021..b80dfac 100644 --- a/src/kudu/tserver/tablet_server-test-base.h +++ b/src/kudu/tserver/tablet_server-test-base.h @@ -120,6 +120,10 @@ class TabletServerTestBase : public KuduTest { void OpenScannerWithAllColumns(ScanResponsePB* resp, ReadMode read_mode = READ_LATEST); + // Fills out a new scan request on all of the columns in the table with the + // given read mode. + Status FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const; + protected: static const char* kTableId; static const char* kTabletId; http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc index 3a734ea..a12e99b 100644 --- a/src/kudu/tserver/tablet_server-test.cc +++ b/src/kudu/tserver/tablet_server-test.cc @@ -70,6 +70,7 @@ #include "kudu/rpc/messenger.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/user_credentials.h" #include "kudu/server/rpc_server.h" #include "kudu/server/server_base.pb.h" #include "kudu/server/server_base.proxy.h" @@ -1529,7 +1530,9 @@ TEST_F(TabletServerTest, TestReadLatest) { ASSERT_TRUE(!scanner_id.empty()); { SharedScanner junk; - ASSERT_TRUE(mini_server_->server()->scanner_manager()->LookupScanner(scanner_id, &junk)); + TabletServerErrorPB::Code error_code; + ASSERT_OK(mini_server_->server()->scanner_manager()->LookupScanner( + scanner_id, proxy_->user_credentials().real_user(), &error_code, &junk)); } // Ensure that the scanner shows up in the server and tablet's metrics. @@ -1552,7 +1555,10 @@ TEST_F(TabletServerTest, TestReadLatest) { // from the scanner manager. { SharedScanner junk; - ASSERT_FALSE(mini_server_->server()->scanner_manager()->LookupScanner(scanner_id, &junk)); + TabletServerErrorPB::Code error_code; + ASSERT_TRUE(mini_server_->server()->scanner_manager()->LookupScanner( + scanner_id, proxy_->user_credentials().real_user(), &error_code, &junk).IsNotFound()); + ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, error_code); } // Ensure that the metrics have been updated now that the scanner is unregistered. @@ -3397,5 +3403,83 @@ TEST_F(TabletServerTest, TestKeysInRowsetMetadataPreventStartupSeeks) { restart_server_and_check_bytes_read(/*keys_in_rowset_meta=*/ true); } +// Test that each scanner can only be accessed by the user who created it. +TEST_F(TabletServerTest, TestScannerCheckMatchingUser) { + rpc::UserCredentials user; + user.set_real_user("good-guy"); + proxy_->set_user_credentials(user); + + InsertTestRowsDirect(0, 100); + ScanResponsePB resp; + NO_FATALS(OpenScannerWithAllColumns(&resp)); + const string& scanner_id = resp.scanner_id(); + ASSERT_TRUE(!scanner_id.empty()); + + // Now do a checksum scan as the user. + string checksum_scanner_id; + int64_t checksum_val; + { + ChecksumRequestPB checksum_req; + ChecksumResponsePB checksum_resp; + RpcController rpc; + ASSERT_OK(FillNewScanRequest(READ_LATEST, checksum_req.mutable_new_request())); + // Set a batch size of 0 so we don't return rows and can expect the scanner + // to remain alive. + checksum_req.set_batch_size_bytes(0); + ASSERT_OK(proxy_->Checksum(checksum_req, &checksum_resp, &rpc)); + SCOPED_TRACE(checksum_resp.DebugString()); + ASSERT_FALSE(checksum_resp.has_error()); + ASSERT_TRUE(checksum_resp.has_more_results()); + checksum_scanner_id = checksum_resp.scanner_id(); + checksum_val = checksum_resp.checksum(); + } + + constexpr auto verify_authz_error = [] (const Status& s) { + EXPECT_TRUE(s.IsRemoteError()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "Not authorized"); + }; + + for (const string& other : { "", "bad-guy" }) { + TabletServerServiceProxy bad_proxy( + client_messenger_, mini_server_->bound_rpc_addr(), + mini_server_->bound_rpc_addr().host()); + if (!other.empty()) { + rpc::UserCredentials other_user; + other_user.set_real_user(other); + bad_proxy.set_user_credentials(other_user); + } + // Other users and clients with no credentials will be bounced for scans, + // checksum scans, and keep-alive requests. + { + ScanRequestPB req; + RpcController rpc; + req.set_scanner_id(scanner_id); + Status s = bad_proxy.Scan(req, &resp, &rpc); + SCOPED_TRACE(resp.DebugString()); + NO_FATALS(verify_authz_error(s)); + } + { + ChecksumRequestPB req; + ContinueChecksumRequestPB* continue_req = req.mutable_continue_request(); + continue_req->set_scanner_id(checksum_scanner_id); + continue_req->set_previous_checksum(checksum_val); + ChecksumResponsePB resp; + RpcController rpc; + Status s = bad_proxy.Checksum(req, &resp, &rpc); + SCOPED_TRACE(resp.DebugString()); + NO_FATALS(verify_authz_error(s)); + } + for (const string& id : { scanner_id, checksum_scanner_id }) { + ScannerKeepAliveRequestPB req; + req.set_scanner_id(id); + ScannerKeepAliveResponsePB resp; + RpcController rpc; + Status s = bad_proxy.ScannerKeepAlive(req, &resp, &rpc); + SCOPED_TRACE(resp.DebugString()); + NO_FATALS(verify_authz_error(s)); + } + } +} + } // namespace tserver } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index da4002c..9b3177e 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -61,6 +61,7 @@ #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stringprintf.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/remote_user.h" #include "kudu/rpc/rpc_context.h" #include "kudu/rpc/rpc_header.pb.h" #include "kudu/rpc/rpc_sidecar.h" @@ -367,6 +368,12 @@ static void SetupErrorAndRespond(TabletServerErrorPB* error, const Status& s, TabletServerErrorPB::Code code, rpc::RpcContext* context) { + // Non-authorized errors will drop the connection. + if (code == TabletServerErrorPB::NOT_AUTHORIZED) { + DCHECK(s.IsNotAuthorized()); + context->RespondRpcFailure(rpc::ErrorStatusPB::FATAL_UNAUTHORIZED, s); + return; + } // Generic "service unavailable" errors will cause the client to retry later. if ((code == TabletServerErrorPB::UNKNOWN_ERROR || code == TabletServerErrorPB::THROTTLED) && s.IsServiceUnavailable()) { @@ -1287,14 +1294,23 @@ void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req, rpc::RpcContext *context) { DCHECK(req->has_scanner_id()); SharedScanner scanner; - if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) { - resp->mutable_error()->set_code(TabletServerErrorPB::SCANNER_EXPIRED); - Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have expired)", - req->scanner_id())); + TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR; + Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(), + context->remote_user().username(), + &error_code, + &scanner); + if (!s.ok()) { StatusToPB(s, resp->mutable_error()->mutable_status()); LOG(INFO) << Substitute("ScannerKeepAlive: $0: remote=$1", s.ToString(), context->requestor_string()); - context->RespondSuccess(); + if (PREDICT_TRUE(s.IsNotFound())) { + resp->mutable_error()->set_code(error_code); + StatusToPB(s, resp->mutable_error()->mutable_status()); + context->RespondSuccess(); + return; + } + DCHECK(s.IsNotAuthorized()); + SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); return; } scanner->UpdateAccessTime(); @@ -1545,7 +1561,7 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req, ScanResultChecksummer collector; bool has_more = false; - TabletServerErrorPB::Code error_code; + TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR; if (req->has_new_request()) { scan_req.mutable_new_scan_request()->CopyFrom(req->new_request()); const NewScanRequestPB& new_req = req->new_request(); @@ -1814,7 +1830,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica, SharedScanner scanner; server_->scanner_manager()->NewScanner(replica, - rpc_context->requestor_string(), + rpc_context->remote_user(), scan_pb.row_format_flags(), &scanner); TRACE("Created scanner $0 for tablet $1", scanner->id(), scanner->tablet_id()); @@ -2043,17 +2059,19 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req, // in case multiple RPCs hit the same scanner at the same time. Probably // just a trylock and fail the RPC if it contends. SharedScanner scanner; - if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) { - if (batch_size_bytes == 0 && req->close_scanner()) { + TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR; + Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(), + rpc_context->remote_user().username(), + &code, + &scanner); + if (!s.ok()) { + if (s.IsNotFound() && batch_size_bytes == 0 && req->close_scanner()) { // Silently ignore any request to close a non-existent scanner. return Status::OK(); } - - *error_code = TabletServerErrorPB::SCANNER_EXPIRED; - Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have expired)", - req->scanner_id())); LOG(INFO) << Substitute("Scan: $0: call sequence id=$1, remote=$2", s.ToString(), req->call_seq_id(), rpc_context->requestor_string()); + *error_code = code; return s; } @@ -2146,7 +2164,7 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req, scoped_refptr<TabletReplica> replica = scanner->tablet_replica(); shared_ptr<Tablet> tablet; TabletServerErrorPB::Code tablet_ref_error_code; - const Status s = GetTabletRef(replica, &tablet, &tablet_ref_error_code); + s = GetTabletRef(replica, &tablet, &tablet_ref_error_code); // If the tablet is not running, but the scan operation in progress // has reached this point, the tablet server has the necessary data to // send in response for the scan continuation request. http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tserver.proto ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto index f9f8cef..afe1b71 100644 --- a/src/kudu/tserver/tserver.proto +++ b/src/kudu/tserver/tserver.proto @@ -96,6 +96,9 @@ message TabletServerErrorPB { // The tablet needs to be evicted and reassigned. TABLET_FAILED = 20; + + // The request is disallowed for the given user. + NOT_AUTHORIZED = 21; } // The error code. http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tserver_path_handlers.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc index 8fa0196..52ec87f 100644 --- a/src/kudu/tserver/tserver_path_handlers.cc +++ b/src/kudu/tserver/tserver_path_handlers.cc @@ -48,6 +48,7 @@ #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/numbers.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/remote_user.h" #include "kudu/server/webui_util.h" #include "kudu/tablet/metadata.pb.h" #include "kudu/tablet/tablet.h" @@ -536,7 +537,7 @@ void ScanToJson(const ScanDescriptor& scan, EasyJson* json) { json->Set("scanner_id", scan.scanner_id); json->Set("state", ScanStateToString(scan.state)); json->Set("query", ScanQueryHtml(scan)); - json->Set("requestor", scan.requestor); + json->Set("requestor", scan.remote_user.username()); json->Set("duration", HumanReadableElapsedTime::ToShortString(duration.ToSeconds())); json->Set("time_since_start",
