This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 73182b0665e336e2c864235c4aceb75db081dfe2 Author: Andrew Wong <[email protected]> AuthorDate: Mon Apr 8 17:26:59 2019 -0700 master: use AuthzProvider to generate authz tokens This patch plugs the AuthzProvider into the master's GetTableSchema endpoint. This allows for privileges to be returned to clients upon calling OpenTable(). Change-Id: Ic5404d6437699bc6c7c8bb0e530b202109e8f166 Reviewed-on: http://gerrit.cloudera.org:8080/13013 Tested-by: Andrew Wong <[email protected]> Reviewed-by: Adar Dembo <[email protected]> Reviewed-by: Hao Hao <[email protected]> --- src/kudu/integration-tests/CMakeLists.txt | 1 + src/kudu/integration-tests/ts_sentry-itest.cc | 542 ++++++++++++++++++++++++++ src/kudu/master/catalog_manager.cc | 32 +- src/kudu/master/catalog_manager.h | 8 +- src/kudu/master/master_service.cc | 23 +- src/kudu/sentry/mini_sentry.cc | 3 + src/kudu/util/random_util.h | 7 +- 7 files changed, 581 insertions(+), 35 deletions(-) diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt index fcd0b74..477231d 100644 --- a/src/kudu/integration-tests/CMakeLists.txt +++ b/src/kudu/integration-tests/CMakeLists.txt @@ -121,6 +121,7 @@ ADD_KUDU_TEST(token_signer-itest) ADD_KUDU_TEST(location_assignment-itest DATA_FILES ../scripts/assign-location.py) ADD_KUDU_TEST(ts_recovery-itest PROCESSORS 4) +ADD_KUDU_TEST(ts_sentry-itest NUM_SHARDS 2) ADD_KUDU_TEST(ts_tablet_manager-itest) ADD_KUDU_TEST(update_scan_delta_compact-test RUN_SERIAL true) ADD_KUDU_TEST(webserver-stress-itest RUN_SERIAL true) diff --git a/src/kudu/integration-tests/ts_sentry-itest.cc b/src/kudu/integration-tests/ts_sentry-itest.cc new file mode 100644 index 0000000..9c8b3be --- /dev/null +++ b/src/kudu/integration-tests/ts_sentry-itest.cc @@ -0,0 +1,542 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdlib> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <unordered_map> +#include <unordered_set> +#include <utility> +#include <vector> + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/client/client.h" +#include "kudu/client/scan_batch.h" +#include "kudu/client/schema.h" +#include "kudu/client/shared_ptr.h" +#include "kudu/client/write_op.h" +#include "kudu/common/common.pb.h" +#include "kudu/common/partial_row.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/hms/hms_client.h" +#include "kudu/hms/mini_hms.h" +#include "kudu/integration-tests/hms_itest-base.h" +#include "kudu/master/sentry_authz_provider-test-base.h" +#include "kudu/mini-cluster/external_mini_cluster.h" +#include "kudu/security/test/mini_kdc.h" +#include "kudu/sentry/mini_sentry.h" +#include "kudu/sentry/sentry_client.h" +#include "kudu/sentry/sentry_policy_service_types.h" +#include "kudu/tablet/transactions/write_transaction.h" +#include "kudu/thrift/client.h" +#include "kudu/tools/data_gen_util.h" +#include "kudu/util/barrier.h" +#include "kudu/util/monotime.h" +#include "kudu/util/random.h" +#include "kudu/util/random_util.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using kudu::client::sp::shared_ptr; +using kudu::client::KuduClient; +using kudu::client::KuduColumnSchema; +using kudu::client::KuduDelete; +using kudu::client::KuduInsert; +using kudu::client::KuduError; +using kudu::client::KuduUpdate; +using kudu::client::KuduScanner; +using kudu::client::KuduScanBatch; +using kudu::client::KuduSchema; +using kudu::client::KuduSchemaBuilder; +using kudu::client::KuduSession; +using kudu::client::KuduTable; +using kudu::client::KuduTableAlterer; +using kudu::client::KuduTableCreator; +using kudu::cluster::ExternalMiniClusterOptions; +using kudu::master::AlterRoleGrantPrivilege; +using kudu::master::CreateRoleAndAddToGroups; +using kudu::master::GetColumnPrivilege; +using kudu::master::GetDatabasePrivilege; +using kudu::master::GetTablePrivilege; +using kudu::sentry::SentryClient; +using kudu::tablet::WritePrivileges; +using kudu::tablet::WritePrivilegeType; +using kudu::tools::GenerateDataForRow; +using sentry::TSentryGrantOption; +using std::pair; +using std::string; +using std::thread; +using std::unique_ptr; +using std::unordered_map; +using std::unordered_set; +using std::vector; +using strings::Substitute; + +namespace kudu { + +namespace { + +// Encapsulates the set of read and write privileges granted to a user. This is +// used for easier composability of tests. +// +// Note: while full table scan privileges could also be included, leaving this +// out simplifies the below tests, which are aimed at testing functionality of +// privileges granted by authz tokens end-to-end; privilege-checking for +// different actions is tested in more depth elsewhere. +struct RWPrivileges { + // The set of write privileges a user may be granted for a table. + WritePrivileges table_write_privileges; + + // The set of column names that the user is authorized to scan. + unordered_set<string> column_scan_privileges; +}; + +const WritePrivileges kFullPrivileges = { + WritePrivilegeType::INSERT, + WritePrivilegeType::UPDATE, + WritePrivilegeType::DELETE, +}; + +// Returns a randomly generated set of read and write privileges, ensuring that +// it contains at least one read and one write privilege. +RWPrivileges GeneratePrivileges(const unordered_set<string>& all_cols, ThreadSafeRandom* prng) { + WritePrivilegeType write_privilege = + SelectRandomElement<WritePrivileges, WritePrivilegeType, ThreadSafeRandom>( + kFullPrivileges, prng); + vector<string> scan_privileges = + SelectRandomSubset<unordered_set<string>, string, ThreadSafeRandom>( + all_cols, /*min_to_return*/1, prng); + RWPrivileges privileges; + privileges.table_write_privileges = WritePrivileges({ write_privilege }); + privileges.column_scan_privileges = + unordered_set<string>(scan_privileges.begin(), scan_privileges.end()); + return privileges; +} + +// Returns the complentary set of privileges to 'orig_privileges'. This is +// useful for generating operations that should fail, if a user is granted the +// privileges in 'orig_privileges'. +RWPrivileges ComplementaryPrivileges(const unordered_set<string>& all_cols, + const RWPrivileges& orig_privileges) { + RWPrivileges privileges; + for (const auto& wp : kFullPrivileges) { + if (!ContainsKey(orig_privileges.table_write_privileges, wp)) { + InsertOrDie(&privileges.table_write_privileges, wp); + } + } + for (const auto& col : all_cols) { + if (!ContainsKey(orig_privileges.column_scan_privileges, col)) { + InsertOrDie(&privileges.column_scan_privileges, col); + } + } + return privileges; +} + +// Performs a write operation to 'table' that should be allowed based on the +// privileges in 'write_privileges', using 'prng' to determine the operation. +Status PerformWrite(const WritePrivileges& write_privileges, + ThreadSafeRandom* prng, + KuduTable* table) { + WritePrivilegeType op_type = + SelectRandomElement<WritePrivileges, WritePrivilegeType, ThreadSafeRandom>( + write_privileges, prng); + shared_ptr<KuduSession> session = table->client()->NewSession(); + const auto unwrap_session_error = [&session] (Status s) { + if (s.IsIOError()) { + vector<KuduError*> errors; + session->GetPendingErrors(&errors, nullptr); + ElementDeleter deleter(&errors); + CHECK_EQ(1, errors.size()); + return errors[0]->status(); + } + return s; + }; + // Note: we could test UPSERTs, but it complicates the logic, and UPSERTs are + // tested elsewhere anyway. + switch (op_type) { + case WritePrivilegeType::INSERT: { + unique_ptr<KuduInsert> ins(table->NewInsert()); + GenerateDataForRow(table->schema(), prng->Next32(), prng, ins->mutable_row()); + return unwrap_session_error(session->Apply(ins.release())); + } + break; + case WritePrivilegeType::UPDATE: { + unique_ptr<KuduUpdate> upd(table->NewUpdate()); + GenerateDataForRow(table->schema(), prng->Next32(), prng, upd->mutable_row()); + return unwrap_session_error(session->Apply(upd.release())); + } + break; + case WritePrivilegeType::DELETE: { + unique_ptr<KuduDelete> del(table->NewDelete()); + KuduPartialRow* row = del->mutable_row(); + RETURN_NOT_OK(row->SetInt32(0, prng->Next32())); + return unwrap_session_error(session->Apply(del.release())); + } + break; + } + return Status::OK(); +} + +// Performs a scan operation to 'table' that should be allowed if the user is +// granted scan privileges on all columns in 'columns'. If provided, uses +// 'prng' to select a subset of rows to scan; otherwise uses all columns. +Status PerformScan(const unordered_set<string>& columns, + ThreadSafeRandom* prng, + KuduTable* table) { + vector<string> cols_to_scan = prng ? + SelectRandomSubset<unordered_set<string>, string, ThreadSafeRandom>( + columns, /*min_to_return*/1, prng) : + vector<string>(columns.begin(), columns.end()); + KuduScanner scanner(table); + RETURN_NOT_OK(scanner.SetTimeoutMillis(30000)); + RETURN_NOT_OK(scanner.SetProjectedColumnNames(cols_to_scan)); + RETURN_NOT_OK(scanner.Open()); + while (scanner.HasMoreRows()) { + KuduScanBatch batch; + RETURN_NOT_OK(scanner.NextBatch(&batch)); + } + return Status::OK(); +} + +// Performs an action that should be allowed with the given set of +// privileges. +Status PerformAction(const RWPrivileges& privileges, + ThreadSafeRandom* prng, KuduTable* table) { + bool can_write = !privileges.table_write_privileges.empty(); + bool can_scan = !privileges.column_scan_privileges.empty(); + CHECK(can_write || can_scan); + // If the user can scan and write, flip a coin for what to do. Otherwise, + // just perform whichever it can. + bool should_write = (can_write && can_scan && rand() % 2 == 0) || + (can_write && !can_scan); + if (should_write) { + CHECK(can_write); + RETURN_NOT_OK(PerformWrite(privileges.table_write_privileges, prng, table)); + } else { + CHECK(can_scan); + RETURN_NOT_OK(PerformScan(privileges.column_scan_privileges, prng, table)); + } + return Status::OK(); +} + +} // anonymous namespace + +// These tests will use the HMS and Sentry, and thus, are very slow. +// SKIP_IF_SLOW_NOT_ALLOWED() should be the very first thing called in the body +// of every test based on this test class. +class TSSentryITest : public HmsITestBase { + public: + // Note: groups and users therein are statically provided to MiniSentry (see + // mini_sentry.cc). We expect Sentry to be aware of users "user[0-2]". + static constexpr int kNumUsers = 3; + static constexpr const char* kAdminGroup = "admin"; + + static constexpr int kNumTables = 3; + static constexpr int kNumColsPerTable = 3; + static constexpr const char* kDb = "db"; + static constexpr const char* kTablePrefix = "table"; + static constexpr const char* kAdminRole = "kudu-admin"; + + static constexpr int kAuthzTokenTTLSecs = 1; + static constexpr int kAuthzCacheTTLMultiplier = 3; + + void SetUp() override { + SKIP_IF_SLOW_NOT_ALLOWED(); + for (int u = 0; u < kNumUsers; u++) { + users_.emplace_back(Substitute("user$0", u)); + } + + ExternalMiniClusterOptions opts; + opts.enable_kerberos = true; + opts.enable_sentry = true; + opts.hms_mode = HmsMode::ENABLE_METASTORE_INTEGRATION; + // Set a low token timeout so we can ensure retries are working properly. + opts.extra_master_flags.emplace_back(Substitute("--authz_token_validity_seconds=$0", + kAuthzTokenTTLSecs)); + opts.extra_master_flags.emplace_back(Substitute("--sentry_privileges_cache_ttl_factor=$0", + kAuthzCacheTTLMultiplier)); + // In addition to our users, we will be using the "kudu" user to perform + // administrative tasks like creating tables. + opts.extra_master_flags.emplace_back( + Substitute("--user_acl=kudu,$0", JoinStrings(users_, ","))); + opts.extra_tserver_flags.emplace_back( + Substitute("--user_acl=$0", JoinStrings(users_, ","))); + opts.extra_tserver_flags.emplace_back("--tserver_enforce_access_control=true"); + NO_FATALS(StartClusterWithOpts(std::move(opts))); + ASSERT_OK(cluster_->kdc()->CreateUserPrincipal("kudu")); + ASSERT_OK(cluster_->kdc()->Kinit("kudu")); + + // Set up the HMS client so we can set up a database. + thrift::ClientOptions hms_opts; + hms_opts.enable_kerberos = true; + hms_opts.service_principal = "hive"; + hms_client_.reset(new hms::HmsClient(cluster_->hms()->address(), hms_opts)); + ASSERT_OK(hms_client_->Start()); + + // Set up the Sentry client so we can set up privileges. + thrift::ClientOptions sentry_opts; + sentry_opts.enable_kerberos = true; + sentry_opts.service_principal = "sentry"; + sentry_client_.reset(new SentryClient(cluster_->sentry()->address(), sentry_opts)); + ASSERT_OK(sentry_client_->Start()); + ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kAdminRole, kAdminGroup)); + ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kAdminRole, + GetDatabasePrivilege(kDb, "ALL", TSentryGrantOption::DISABLED))); + + // Create the database in the HMS. + ASSERT_OK(CreateDatabase(kDb)); + + // Create a client as the "kudu" user, who now has admin privileges. + ASSERT_OK(cluster_->CreateClient(nullptr, &admin_client_)); + + // Finally populate a set of column names to use for our tables. + for (int i = 0; i < kNumColsPerTable; i++) { + cols_.emplace_back(Substitute("col$0", i)); + } + } + + // Creates a table named 'table_ident' with 'kNumColsPerTable' columns. + Status CreateTable(const string& table_ident) { + KuduSchema schema; + KuduSchemaBuilder b; + auto iter = cols_.begin(); + b.AddColumn(*iter++)->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); + while (iter < cols_.end()) { + b.AddColumn(*iter++)->Type(KuduColumnSchema::INT32); + } + RETURN_NOT_OK(b.Build(&schema)); + unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); + return table_creator->table_name(table_ident) + .schema(&schema) + .set_range_partition_columns({ "col0" }) + .num_replicas(1) + .Create(); + } + + void TearDown() override { + SKIP_IF_SLOW_NOT_ALLOWED(); + HmsITestBase::TearDown(); + } + + protected: + // A Sentry client with which to grant privileges. + unique_ptr<SentryClient> sentry_client_; + + // Kudu client with which to perform admin operations. + shared_ptr<KuduClient> admin_client_; + + // A list of users that may try to do things. + vector<string> users_; + + // A list of columns that each table should have. + vector<string> cols_; +}; + +// Tests authorizing read and write operations coming from multiple concurrent +// users for multiple tables. +TEST_F(TSSentryITest, TestReadsAndWrites) { + SKIP_IF_SLOW_NOT_ALLOWED(); + + // First, set up the tables. + vector<string> tables; + for (int i = 0; i < kNumTables; i++) { + string table_name = Substitute("$0$1", kTablePrefix, i); + ASSERT_OK(CreateTable(Substitute("$0.$1", kDb, table_name))); + tables.emplace_back(std::move(table_name)); + } + + // Keep track of the privileges that each user has been granted and not been + // granted per table. + typedef pair<RWPrivileges, RWPrivileges> GrantedNotGrantedPrivileges; + typedef unordered_map<string, GrantedNotGrantedPrivileges> TableNameToPrivileges; + unordered_map<string, TableNameToPrivileges> user_to_privileges; + + // Set up a bunch of clients for each user. + unordered_map<string, vector<shared_ptr<KuduClient>>> user_to_clients; + ThreadSafeRandom prng(SeedRandom()); + unordered_set<string> cols(cols_.begin(), cols_.end()); + static constexpr int kNumClientsPerUser = 4; + for (int i = 0; i < kNumUsers; i++) { + const string& user = users_[i]; + // Register the user with the KDC, and add a role to the user's group + // (provided to MiniSentry in mini_sentry.cc). + ASSERT_OK(cluster_->kdc()->CreateUserPrincipal(user)); + ASSERT_OK(cluster_->kdc()->Kinit(user)); + const string role = Substitute("role$0", i); + ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), role, Substitute("group$0", i))); + + // Set up multiple clients for each user. + vector<shared_ptr<KuduClient>> clients; + for (int i = 0; i < kNumClientsPerUser; i++) { + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + clients.emplace_back(std::move(client)); + } + EmplaceOrDie(&user_to_clients, user, std::move(clients)); + + // Generate privileges for each user for every table, and grant the + // appropriate Sentry privileges. + TableNameToPrivileges table_to_privileges; + for (const string& table_name : tables) { + RWPrivileges granted_privileges = GeneratePrivileges(cols, &prng); + for (const auto& wp : granted_privileges.table_write_privileges) { + ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), role, + GetTablePrivilege(kDb, table_name, WritePrivilegeToString(wp)))); + } + for (const auto& col : granted_privileges.column_scan_privileges) { + ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), role, + GetColumnPrivilege(kDb, table_name, col, "SELECT"))); + } + RWPrivileges not_granted_privileges = ComplementaryPrivileges(cols, granted_privileges); + InsertOrDie(&table_to_privileges, table_name, + { std::move(granted_privileges), std::move(not_granted_privileges) }); + } + EmplaceOrDie(&user_to_privileges, user, std::move(table_to_privileges)); + } + + // In parallel, have each user's clients perform a series of operations on a + // table for some extended period of time (longer than the token timeout). Do + // this for a few tables for each client. + static constexpr int kNumOpPeriods = 3; + static const MonoDelta kPeriodTime = MonoDelta::FromSeconds(kAuthzTokenTTLSecs * 3); + vector<thread> threads; + Barrier b(kNumUsers * kNumClientsPerUser); + SCOPED_CLEANUP({ + for (auto& t : threads) { + t.join(); + } + }); + for (const string& user : users_) { + // Start a thread for every user that performs a bunch of operations. + const auto* const table_to_privileges = FindOrNull(user_to_privileges, user); + for (const auto& client_sp : FindOrDie(user_to_clients, user)) { + KuduClient* client = client_sp.get(); + threads.emplace_back([client, table_to_privileges, &b, &tables, &prng] { + b.Wait(); + // Perform a bunch of operations, switching back and forth between + // different tables to ensure a client uses the appropriate privileges. + for (int i = 0; i < kNumOpPeriods; i++) { + const auto& table_name = + SelectRandomElement<vector<string>, string, ThreadSafeRandom>(tables, &prng); + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(Substitute("$0.$1", kDb, table_name), &table)); + const MonoTime end_time = MonoTime::Now() + kPeriodTime; + while (MonoTime::Now() < end_time) { + const auto& privileges = FindOrDie(*table_to_privileges, table_name); + const auto& granted_privileges = privileges.first; + const auto& non_granted_privileges = privileges.second; + // Perform a permitted operation. We might not get an OK status if + // e.g. we're inserting a row that already exists, but the operation + // should always be permitted. + Status s = PerformAction(granted_privileges, &prng, table.get()); + ASSERT_FALSE(s.IsNotAuthorized()) << s.ToString(); + ASSERT_STR_NOT_CONTAINS(s.ToString(), "not authorized"); + + // Now perform an operation based on the privileges we _don't_ have; + // this should always yield authorization errors. + s = PerformAction(non_granted_privileges, &prng, table.get()); + ASSERT_TRUE(s.IsRemoteError()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "not authorized"); + } + } + }); + } + } +} + +// Test for a couple of scenarios related to alter tables. +TEST_F(TSSentryITest, TestAlters) { + SKIP_IF_SLOW_NOT_ALLOWED(); + + static const string kTableName = "table"; + const string table_ident = Substitute("$0.$1", kDb, kTableName); + ASSERT_OK(CreateTable(table_ident)); + + const string user = "user0"; + ASSERT_OK(cluster_->kdc()->CreateUserPrincipal(user)); + ASSERT_OK(cluster_->kdc()->Kinit(user)); + const string role = "role0"; + ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), role, "group0")); + + shared_ptr<KuduClient> user_client; + ASSERT_OK(cluster_->CreateClient(nullptr, &user_client)); + + // Note: we only need privileges on the metadata for OpenTable() calls. + // METADATA isn't a first-class Sentry privilege and won't get carried over + // on table rename, so we just grant INSERT privileges. + ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), role, + GetTablePrivilege(kDb, kTableName, "INSERT"))); + + // First, grant privileges on a new column that doesn't yet exist. Once that + // column is created, we should be able to scan it immediately. + const string new_column = Substitute("col$0", kNumColsPerTable); + ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), role, + GetColumnPrivilege(kDb, kTableName, new_column, "SELECT"))); + { + unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(table_ident)); + table_alterer->AddColumn(new_column)->Type(KuduColumnSchema::INT32); + ASSERT_OK(table_alterer->Alter()); + } + shared_ptr<KuduTable> table; + ASSERT_OK(user_client->OpenTable(table_ident, &table)); + ASSERT_OK(PerformScan({ new_column }, /*prng=*/nullptr, table.get())); + + // Now create another column and grant the user privileges for that column. + // Since privileges are cached, even though we've granted privileges, clients + // will use the cached privilege and not be authorized for a bit. + const string another_column = Substitute("col$0", kNumColsPerTable + 1); + ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), role, + GetColumnPrivilege(kDb, kTableName, another_column, "SELECT"))); + { + unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(table_ident)); + table_alterer->AddColumn(another_column)->Type(KuduColumnSchema::INT32); + ASSERT_OK(table_alterer->Alter()); + } + ASSERT_OK(user_client->OpenTable(table_ident, &table)); + Status s = PerformScan({ another_column }, /*prng=*/nullptr, table.get()); + ASSERT_TRUE(s.IsRemoteError()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "not authorized"); + + // Wait the full duration of the cache TTL, and an additional full token TTL. + // This ensures that the client's token will expire we will get a new one + // with the most up-to-date privileges from Sentry. + SleepFor(MonoDelta::FromSeconds(kAuthzTokenTTLSecs * (1 + kAuthzCacheTTLMultiplier))); + ASSERT_OK(PerformScan({ another_column }, /*prng=*/nullptr, table.get())); + + // Now rename the table to something else. There shouldn't be any privileges + // cached for the newly-renamed table, so we should immediately be able to + // scan it. + const string new_table_ident = Substitute("$0.$1", kDb, "newtable"); + { + unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(table_ident)); + table_alterer->RenameTo(new_table_ident); + ASSERT_OK(table_alterer->Alter()); + } + ASSERT_OK(user_client->OpenTable(new_table_ident, &table)); + ASSERT_OK(PerformScan({ another_column }, nullptr, table.get())); +} + +} // namespace kudu diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index d59c45a..c5b9dcd 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -286,6 +286,7 @@ using kudu::rpc::RpcContext; using kudu::security::Cert; using kudu::security::DataFormat; using kudu::security::PrivateKey; +using kudu::security::TablePrivilegePB; using kudu::security::TokenSigner; using kudu::security::TokenSigningPrivateKey; using kudu::security::TokenSigningPrivateKeyPB; @@ -2725,7 +2726,8 @@ Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req, Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req, GetTableSchemaResponsePB* resp, - optional<const string&> user) { + optional<const string&> user, + const TokenSigner* token_signer) { leader_lock_.AssertAcquiredForReading(); RETURN_NOT_OK(CheckOnline()); @@ -2742,15 +2744,25 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req, &table, &l)); RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp)); - if (l.data().pb.has_fully_applied_schema()) { - // An AlterTable is in progress; fully_applied_schema is the last - // schema that has reached every TS. - CHECK_EQ(SysTablesEntryPB::ALTERING, l.data().pb.state()); - resp->mutable_schema()->CopyFrom(l.data().pb.fully_applied_schema()); - } else { - // There's no AlterTable, the regular schema is "fully applied". - resp->mutable_schema()->CopyFrom(l.data().pb.schema()); - } + // If fully_applied_schema is set, use it, since an alter is in progress. + CHECK(!l.data().pb.has_fully_applied_schema() || + (l.data().pb.state() == SysTablesEntryPB::ALTERING)); + const SchemaPB& schema_pb = l.data().pb.has_fully_applied_schema() ? + l.data().pb.fully_applied_schema() : l.data().pb.schema(); + + if (token_signer && user) { + TablePrivilegePB table_privilege; + table_privilege.set_table_id(table->id()); + RETURN_NOT_OK( + SetupError(authz_provider_->FillTablePrivilegePB(l.data().name(), *user, schema_pb, + &table_privilege), + resp, MasterErrorPB::UNKNOWN_ERROR)); + security::SignedTokenPB authz_token; + RETURN_NOT_OK(token_signer->GenerateAuthzToken( + *user, std::move(table_privilege), &authz_token)); + *resp->mutable_authz_token() = std::move(authz_token); + } + resp->mutable_schema()->CopyFrom(schema_pb); resp->set_num_replicas(l.data().pb.num_replicas()); resp->set_table_id(table->id()); resp->mutable_partition_schema()->CopyFrom(l.data().pb.partition_schema()); diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h index 88a0227..94dffc5 100644 --- a/src/kudu/master/catalog_manager.h +++ b/src/kudu/master/catalog_manager.h @@ -84,6 +84,7 @@ class RpcContext; namespace security { class Cert; class PrivateKey; +class TokenSigner; class TokenSigningPublicKeyPB; } // namespace security @@ -576,10 +577,13 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { boost::optional<const std::string&> user); // Get the information about the specified table. If 'user' is provided, - // checks that the user is authorized to get such information. + // checks that the user is authorized to get such information. If a token + // signer is provided (e.g. authz token generation is enabled), an authz + // token will be attached to the response. Status GetTableSchema(const GetTableSchemaRequestPB* req, GetTableSchemaResponsePB* resp, - boost::optional<const std::string&> user); + boost::optional<const std::string&> user, + const security::TokenSigner* token_signer); // List all the running tables. If 'user' is provided, checks that the user // is authorized to get such information, otherwise, only list the tables that diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc index bcb244f..5b44196 100644 --- a/src/kudu/master/master_service.cc +++ b/src/kudu/master/master_service.cc @@ -431,33 +431,14 @@ void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB* req, } Status s = server_->catalog_manager()->GetTableSchema( - req, resp, make_optional<const string&>(rpc->remote_user().username())); + req, resp, make_optional<const string&>(rpc->remote_user().username()), + FLAGS_master_support_authz_tokens ? server_->token_signer() : nullptr); CheckRespErrorOrSetUnknown(s, resp); if (resp->has_error()) { // If there was an application error, respond to the RPC. rpc->RespondSuccess(); return; } - - // TODO(awong): fill this token in with actual privileges from the - // appropriate AuthzProvider. For now, assume the user has all privileges - // for the table. - if (PREDICT_TRUE(FLAGS_master_support_authz_tokens)) { - SignedTokenPB authz_token; - TablePrivilegePB table_privilege; - table_privilege.set_table_id(resp->table_id()); - table_privilege.set_scan_privilege(true); - table_privilege.set_insert_privilege(true); - table_privilege.set_update_privilege(true); - table_privilege.set_delete_privilege(true); - s = server_->token_signer()->GenerateAuthzToken(rpc->remote_user().username(), - std::move(table_privilege), &authz_token); - if (!s.ok()) { - rpc->RespondFailure(s); - return; - } - *resp->mutable_authz_token() = std::move(authz_token); - } rpc->RespondSuccess(); } diff --git a/src/kudu/sentry/mini_sentry.cc b/src/kudu/sentry/mini_sentry.cc index caecfa9..cb3bbe1 100644 --- a/src/kudu/sentry/mini_sentry.cc +++ b/src/kudu/sentry/mini_sentry.cc @@ -329,6 +329,9 @@ test-admin=admin test-user=user kudu=admin joe-interloper="" +user0=group0 +user1=group1 +user2=group2 )"; RETURN_NOT_OK(WriteStringToFile(Env::Default(), kUsers, users_ini_path)); diff --git a/src/kudu/util/random_util.h b/src/kudu/util/random_util.h index 9dfc510..a607051 100644 --- a/src/kudu/util/random_util.h +++ b/src/kudu/util/random_util.h @@ -50,10 +50,13 @@ T SelectRandomElement(const Container& c, Rand* r) { } // Returns a randomly-selected subset from the container. +// +// The results are not stored in a randomized order: the order of results will +// match their order in the input collection. template <typename Container, typename T, typename Rand> std::vector<T> SelectRandomSubset(const Container& c, int min_to_return, Rand* r) { - CHECK_GT(c.size(), min_to_return); - int num_to_return = min_to_return + r->Uniform(c.size() - min_to_return); + CHECK_GE(c.size(), min_to_return); + int num_to_return = min_to_return + r->Uniform(1 + c.size() - min_to_return); std::vector<T> rand_list; ReservoirSample(c, num_to_return, std::set<T>{}, r, &rand_list); return rand_list;
