Repository: kudu
Updated Branches:
  refs/heads/master 038c6bc05 -> 6ce61d6e6


KUDU-1918 Prevent hijacking of scanner IDs

This makes the scanner remember its RemoteUser, and ensures that when
continuing a scan, the new requestor matches the original requestor.

This prevents one user from somehow obtaining a scanner ID from another
and then "hijacking" the in-progress scan.

This restricts scans, checksum scans, and keep-alive requests.

Change-Id: Ic91fa0ca471bd674e35aa2f8de3806b88ad4b3b4
Reviewed-on: http://gerrit.cloudera.org:8080/6348
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e172df40
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e172df40
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e172df40

Branch: refs/heads/master
Commit: e172df405aef47b1339c9879b835baf69b539f8c
Parents: 038c6bc
Author: Andrew Wong <[email protected]>
Authored: Thu Nov 1 10:49:41 2018 -0700
Committer: Andrew Wong <[email protected]>
Committed: Tue Nov 6 20:55:31 2018 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc              | 65 +++++++++++++++++
 src/kudu/client/client.h                    |  1 +
 src/kudu/client/scanner-internal.cc         |  3 +
 src/kudu/client/scanner-internal.h          |  3 +
 src/kudu/tserver/scanners-test.cc           | 25 ++++---
 src/kudu/tserver/scanners.cc                | 36 +++++++---
 src/kudu/tserver/scanners.h                 | 39 +++++++----
 src/kudu/tserver/tablet_server-test-base.cc | 12 ++--
 src/kudu/tserver/tablet_server-test-base.h  |  4 ++
 src/kudu/tserver/tablet_server-test.cc      | 88 +++++++++++++++++++++++-
 src/kudu/tserver/tablet_service.cc          | 46 +++++++++----
 src/kudu/tserver/tserver.proto              |  3 +
 src/kudu/tserver/tserver_path_handlers.cc   |  3 +-
 13 files changed, 272 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index efcdd68..10a4c6c 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -5714,5 +5714,70 @@ TEST_F(ClientTest, 
TestAuthenticationCredentialsRealUser) {
   KuduScanner scanner(client_table_.get());
   ASSERT_OK(ScanToStrings(&scanner, &rows));
 }
+
+// Test that clients that aren't authenticated as the appropriate user will be
+// unable to hijack a specific scanner ID.
+TEST_F(ClientTest, TestBlockScannerHijackingAttempts) {
+  const string kUser = "token-user";
+  const string kBadGuy = "bad-guy";
+  const string table_name = client_table_->name();
+  FLAGS_user_acl = Substitute("$0,$1", kUser, kBadGuy);
+  cluster_->ShutdownNodes(cluster::ClusterNodes::ALL);
+  ASSERT_OK(cluster_->StartSync());
+
+  // Insert some rows to the table.
+  NO_FATALS(InsertTestRows(client_table_.get(), FLAGS_test_scan_num_rows));
+
+  // First authenticate as a user and create a scanner for the existing table.
+  const auto get_table_as_user = [&] (const string& user, 
shared_ptr<KuduTable>* table) {
+    KuduClientBuilder client_builder;
+    string authn_creds;
+    AuthenticationCredentialsPB pb;
+    pb.set_real_user(user);
+    ASSERT_TRUE(pb.SerializeToString(&authn_creds));
+    client_builder.import_authentication_credentials(authn_creds);
+
+    // Create the client and table for the user.
+    shared_ptr<KuduClient> user_client;
+    ASSERT_OK(cluster_->CreateClient(&client_builder, &user_client));
+    ASSERT_OK(user_client->OpenTable(table_name, table));
+  };
+
+  shared_ptr<KuduTable> user_table;
+  shared_ptr<KuduTable> bad_guy_table;
+  NO_FATALS(get_table_as_user(kUser, &user_table));
+  NO_FATALS(get_table_as_user(kBadGuy, &bad_guy_table));
+
+  // Test both fault-tolerant scanners and non-fault-tolerant scanners.
+  for (bool fault_tolerance : { true, false }) {
+    // Scan the table as the user to get a scanner ID, and set up a malicious
+    // scanner that will try to hijack that scanner ID. Set an initial batch
+    // size of 0 so the calls to Open() don't buffer any rows.
+    KuduScanner user_scanner(user_table.get());
+    ASSERT_OK(user_scanner.SetBatchSizeBytes(0));
+    KuduScanner bad_guy_scanner(bad_guy_table.get());
+    ASSERT_OK(bad_guy_scanner.SetBatchSizeBytes(0));
+    if (fault_tolerance) {
+      ASSERT_OK(user_scanner.SetFaultTolerant());
+      ASSERT_OK(bad_guy_scanner.SetFaultTolerant());
+    }
+    ASSERT_OK(user_scanner.Open());
+    ASSERT_OK(bad_guy_scanner.Open());
+    const string scanner_id = user_scanner.data_->next_req_.scanner_id();
+    ASSERT_FALSE(scanner_id.empty());
+
+    // Now attempt to get that scanner id as a different user.
+    LOG(INFO) << Substitute("Attempting to extract data from $0 scan $1 as $2",
+        fault_tolerance ? "fault-tolerant" : "non-fault-tolerant", scanner_id, 
kBadGuy);
+    bad_guy_scanner.data_->next_req_.set_scanner_id(scanner_id);
+    bad_guy_scanner.data_->last_response_.set_has_more_results(true);
+    KuduScanBatch batch;
+    Status s = bad_guy_scanner.NextBatch(&batch);
+    ASSERT_TRUE(s.IsRemoteError());
+    ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+    ASSERT_EQ(0, batch.NumRows());
+  }
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 56c3f50..626df7d 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -2224,6 +2224,7 @@ class KUDU_EXPORT KuduScanner {
   class KUDU_NO_EXPORT Data;
 
   friend class KuduScanToken;
+  FRIEND_TEST(ClientTest, TestBlockScannerHijackingAttempts);
   FRIEND_TEST(ClientTest, TestScanCloseProxy);
   FRIEND_TEST(ClientTest, TestScanFaultTolerance);
   FRIEND_TEST(ClientTest, TestScanNoBlockCaching);

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc 
b/src/kudu/client/scanner-internal.cc
index 9ac1a86..d7c30bb 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -243,6 +243,9 @@ ScanRpcStatus KuduScanner::Data::AnalyzeResponse(const 
Status& rpc_status,
         case rpc::ErrorStatusPB::ERROR_UNAVAILABLE:
           return ScanRpcStatus{
               ScanRpcStatus::SERVICE_UNAVAILABLE, rpc_status};
+        case rpc::ErrorStatusPB::FATAL_UNAUTHORIZED:
+          return ScanRpcStatus{
+              ScanRpcStatus::SCAN_NOT_AUTHORIZED, rpc_status};
         default:
           return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status};
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h 
b/src/kudu/client/scanner-internal.h
index 0dddb40..5e7652b 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -89,6 +89,9 @@ struct ScanRpcStatus {
     // the token has expired.
     RPC_INVALID_AUTHENTICATION_TOKEN,
 
+    // The requestor was not authorized to make the request.
+    SCAN_NOT_AUTHORIZED,
+
     // Another RPC-system error (e.g. NetworkError because the TS was down).
     RPC_ERROR,
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners-test.cc 
b/src/kudu/tserver/scanners-test.cc
index 7a2084e..e2456c4 100644
--- a/src/kudu/tserver/scanners-test.cc
+++ b/src/kudu/tserver/scanners-test.cc
@@ -24,42 +24,51 @@
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/scanner_metrics.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
 
 DECLARE_int32(scanner_ttl_ms);
 
 namespace kudu {
 
+using rpc::RemoteUser;
+using std::vector;
 using tablet::TabletReplica;
 
 namespace tserver {
 
-using std::vector;
+static const char* kUsername = "kudu-user";
 
 TEST(ScannersTest, TestManager) {
   scoped_refptr<TabletReplica> null_replica(nullptr);
   ScannerManager mgr(nullptr);
 
   // Create two scanners, make sure their ids are different.
+  RemoteUser user;
+  user.SetUnauthenticated(kUsername);
   SharedScanner s1, s2;
-  mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s1);
-  mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s2);
+  mgr.NewScanner(null_replica, user, RowFormatFlags::NO_FLAGS, &s1);
+  mgr.NewScanner(null_replica, user, RowFormatFlags::NO_FLAGS, &s2);
   ASSERT_NE(s1->id(), s2->id());
 
   // Check that they're both registered.
   SharedScanner result;
-  ASSERT_TRUE(mgr.LookupScanner(s1->id(), &result));
+  TabletServerErrorPB::Code error_code;
+  ASSERT_OK(mgr.LookupScanner(s1->id(), kUsername, &error_code, &result));
   ASSERT_EQ(result.get(), s1.get());
 
-  ASSERT_TRUE(mgr.LookupScanner(s2->id(), &result));
+  ASSERT_OK(mgr.LookupScanner(s2->id(), kUsername, &error_code, &result));
   ASSERT_EQ(result.get(), s2.get());
 
   // Check that looking up a bad scanner returns false.
-  ASSERT_FALSE(mgr.LookupScanner("xxx", &result));
+  ASSERT_TRUE(mgr.LookupScanner("xxx", kUsername, &error_code, 
&result).IsNotFound());
+  ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, error_code);
 
   // Remove the scanners.
   ASSERT_TRUE(mgr.UnregisterScanner(s1->id()));
@@ -75,8 +84,8 @@ TEST(ScannerTest, TestExpire) {
   MetricRegistry registry;
   ScannerManager mgr(METRIC_ENTITY_server.Instantiate(&registry, "test"));
   SharedScanner s1, s2;
-  mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s1);
-  mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s2);
+  mgr.NewScanner(null_replica, RemoteUser(), RowFormatFlags::NO_FLAGS, &s1);
+  mgr.NewScanner(null_replica, RemoteUser(), RowFormatFlags::NO_FLAGS, &s2);
   SleepFor(MonoDelta::FromMilliseconds(200));
   s2->UpdateAccessTime();
   mgr.RemoveExpiredScanners();

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index 6fb9d77..60f69fb 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -34,6 +34,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_metrics.h"
@@ -68,6 +69,7 @@ using strings::Substitute;
 
 namespace kudu {
 
+using rpc::RemoteUser;
 using tablet::TabletReplica;
 
 namespace tserver {
@@ -131,20 +133,16 @@ ScannerManager::ScannerMapStripe& 
ScannerManager::GetStripeByScannerId(const str
 }
 
 void ScannerManager::NewScanner(const scoped_refptr<TabletReplica>& 
tablet_replica,
-                                const std::string& requestor_string,
+                                const RemoteUser& remote_user,
                                 uint64_t row_format_flags,
                                 SharedScanner* scanner) {
   // Keep trying to generate a unique ID until we get one.
   bool success = false;
   while (!success) {
-    // TODO(KUDU-1918): are these UUIDs predictable? If so, we should
-    // probably generate random numbers instead, since we can safely
-    // just retry until we avoid a collision. Alternatively we could
-    // verify that the requestor userid does not change mid-scan.
     string id = oid_generator_.Next();
     scanner->reset(new Scanner(id,
                                tablet_replica,
-                               requestor_string,
+                               remote_user,
                                metrics_.get(),
                                row_format_flags));
 
@@ -154,10 +152,26 @@ void ScannerManager::NewScanner(const 
scoped_refptr<TabletReplica>& tablet_repli
   }
 }
 
-bool ScannerManager::LookupScanner(const string& scanner_id, SharedScanner* 
scanner) {
+Status ScannerManager::LookupScanner(const string& scanner_id,
+                                     const string& username,
+                                     TabletServerErrorPB::Code* error_code,
+                                     SharedScanner* scanner) {
+  SharedScanner ret;
   ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id);
   shared_lock<RWMutex> l(stripe.lock_);
-  return FindCopy(stripe.scanners_by_id_, scanner_id, scanner);
+  bool found_scanner = FindCopy(stripe.scanners_by_id_, scanner_id, &ret);
+  if (!found_scanner) {
+    *error_code = TabletServerErrorPB::SCANNER_EXPIRED;
+    return Status::NotFound(Substitute("Scanner $0 not found (it may have 
expired)",
+                                       scanner_id));
+  }
+  if (username != ret->remote_user().username()) {
+    *error_code = TabletServerErrorPB::NOT_AUTHORIZED;
+    return Status::NotAuthorized(Substitute("User $0 doesn't own scanner $1",
+                                            username, scanner_id));
+  }
+  *scanner = std::move(ret);
+  return Status::OK();
 }
 
 bool ScannerManager::UnregisterScanner(const string& scanner_id) {
@@ -290,11 +304,11 @@ void 
ScannerManager::RecordCompletedScanUnlocked(ScanDescriptor descriptor) {
 const std::string Scanner::kNullTabletId = "null tablet";
 
 Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica,
-                 string requestor_string, ScannerMetrics* metrics,
+                 RemoteUser remote_user, ScannerMetrics* metrics,
                  uint64_t row_format_flags)
     : id_(std::move(id)),
       tablet_replica_(tablet_replica),
-      requestor_string_(std::move(requestor_string)),
+      remote_user_(std::move(remote_user)),
       call_seq_id_(0),
       start_time_(MonoTime::Now()),
       metrics_(metrics),
@@ -353,7 +367,7 @@ ScanDescriptor Scanner::descriptor() const {
   ScanDescriptor descriptor;
   descriptor.tablet_id = tablet_id();
   descriptor.scanner_id = id();
-  descriptor.requestor = requestor_string();
+  descriptor.remote_user = remote_user();
   descriptor.start_time = start_time_;
 
   for (const auto& column : client_projection_schema()->columns()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners.h b/src/kudu/tserver/scanners.h
index 9588053..354ec90 100644
--- a/src/kudu/tserver/scanners.h
+++ b/src/kudu/tserver/scanners.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_TSERVER_SCANNERS_H
-#define KUDU_TSERVER_SCANNERS_H
+#pragma once
 
 #include <cstddef>
 #include <cstdint>
@@ -34,7 +33,9 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/auto_release_pool.h"
 #include "kudu/util/condition_variable.h"
 #include "kudu/util/locks.h"
@@ -55,9 +56,11 @@ class Thread;
 namespace tserver {
 
 class Scanner;
+
 enum class ScanState;
 struct ScanDescriptor;
 struct ScannerMetrics;
+
 typedef std::shared_ptr<Scanner> SharedScanner;
 
 // Manages the live scanners within a Tablet Server.
@@ -76,15 +79,22 @@ class ScannerManager {
   // Starts the expired scanner removal thread.
   Status StartRemovalThread();
 
-  // Create a new scanner with a unique ID, inserting it into the map.
+  // Create a new scanner with a unique ID, inserting it into the map. Further
+  // lookups for the scanner must provide the username associated with
+  // 'remote_user'.
   void NewScanner(const scoped_refptr<tablet::TabletReplica>& tablet_replica,
-                  const std::string& requestor_string,
+                  const rpc::RemoteUser& remote_user,
                   uint64_t row_format_flags,
                   SharedScanner* scanner);
 
-  // Lookup the given scanner by its ID.
-  // Returns true if the scanner is found successfully.
-  bool LookupScanner(const std::string& scanner_id, SharedScanner* scanner);
+  // Lookup the given scanner by its ID with the provided username, setting an
+  // appropriate error code.
+  // Returns NotFound if the scanner doesn't exist, or NotAuthorized if the
+  // scanner wasn't created by 'username'.
+  Status LookupScanner(const std::string& scanner_id,
+                       const std::string& username,
+                       TabletServerErrorPB::Code* error_code,
+                       SharedScanner* scanner);
 
   // Unregister the given scanner by its ID.
   // Returns true if unregistered successfully.
@@ -185,7 +195,7 @@ class Scanner {
  public:
   Scanner(std::string id,
           const scoped_refptr<tablet::TabletReplica>& tablet_replica,
-          std::string requestor_string, ScannerMetrics* metrics,
+          rpc::RemoteUser remote_user, ScannerMetrics* metrics,
           uint64_t row_format_flags);
   ~Scanner();
 
@@ -238,7 +248,7 @@ class Scanner {
 
   const scoped_refptr<tablet::TabletReplica>& tablet_replica() const { return 
tablet_replica_; }
 
-  const std::string& requestor_string() const { return requestor_string_; }
+  const rpc::RemoteUser& remote_user() const { return remote_user_; }
 
   // Returns the current call sequence ID of the scanner.
   uint32_t call_seq_id() const {
@@ -322,9 +332,9 @@ class Scanner {
   // Tablet associated with the scanner.
   const scoped_refptr<tablet::TabletReplica> tablet_replica_;
 
-  // Information about the requestor. Populated from
-  // RpcContext::requestor_string().
-  const std::string requestor_string_;
+  // The remote user making the request. Populated from the RemoteUser of the
+  // first request.
+  const rpc::RemoteUser remote_user_;
 
   // The last time that the scanner was accessed.
   MonoTime last_access_time_;
@@ -392,8 +402,8 @@ struct ScanDescriptor {
   // The scanner ID.
   std::string scanner_id;
 
-  // The scan requestor.
-  std::string requestor;
+  // The user that made the first request.
+  rpc::RemoteUser remote_user;
 
   // The table name.
   std::string table_name;
@@ -415,4 +425,3 @@ struct ScanDescriptor {
 } // namespace tserver
 } // namespace kudu
 
-#endif

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.cc 
b/src/kudu/tserver/tablet_server-test-base.cc
index 924bf5e..59b26e9 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -458,6 +458,12 @@ void TabletServerTestBase::VerifyScanRequestFailure(
   }
 }
 
+Status TabletServerTestBase::FillNewScanRequest(ReadMode read_mode, 
NewScanRequestPB* scan) const {
+  scan->set_tablet_id(kTabletId);
+  scan->set_read_mode(read_mode);
+  return SchemaToColumnPBs(schema_, scan->mutable_projected_columns());
+}
+
 // Open a new scanner which scans all of the columns in the table.
 void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp,
                                                      ReadMode read_mode) {
@@ -465,11 +471,7 @@ void 
TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp,
   RpcController rpc;
 
   // Set up a new request with no predicates, all columns.
-  const Schema& projection = schema_;
-  NewScanRequestPB* scan = req.mutable_new_scan_request();
-  scan->set_tablet_id(kTabletId);
-  scan->set_read_mode(read_mode);
-  ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
+  ASSERT_OK(FillNewScanRequest(read_mode, req.mutable_new_scan_request()));
   req.set_call_seq_id(0);
   req.set_batch_size_bytes(0); // so it won't return data right away
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h 
b/src/kudu/tserver/tablet_server-test-base.h
index 9841021..b80dfac 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -120,6 +120,10 @@ class TabletServerTestBase : public KuduTest {
   void OpenScannerWithAllColumns(ScanResponsePB* resp,
                                  ReadMode read_mode = READ_LATEST);
 
+  // Fills out a new scan request on all of the columns in the table with the
+  // given read mode.
+  Status FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const;
+
  protected:
   static const char* kTableId;
   static const char* kTabletId;

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc 
b/src/kudu/tserver/tablet_server-test.cc
index 3a734ea..a12e99b 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -70,6 +70,7 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/user_credentials.h"
 #include "kudu/server/rpc_server.h"
 #include "kudu/server/server_base.pb.h"
 #include "kudu/server/server_base.proxy.h"
@@ -1529,7 +1530,9 @@ TEST_F(TabletServerTest, TestReadLatest) {
   ASSERT_TRUE(!scanner_id.empty());
   {
     SharedScanner junk;
-    
ASSERT_TRUE(mini_server_->server()->scanner_manager()->LookupScanner(scanner_id,
 &junk));
+    TabletServerErrorPB::Code error_code;
+    ASSERT_OK(mini_server_->server()->scanner_manager()->LookupScanner(
+        scanner_id, proxy_->user_credentials().real_user(), &error_code, 
&junk));
   }
 
   // Ensure that the scanner shows up in the server and tablet's metrics.
@@ -1552,7 +1555,10 @@ TEST_F(TabletServerTest, TestReadLatest) {
   // from the scanner manager.
   {
     SharedScanner junk;
-    
ASSERT_FALSE(mini_server_->server()->scanner_manager()->LookupScanner(scanner_id,
 &junk));
+    TabletServerErrorPB::Code error_code;
+    ASSERT_TRUE(mini_server_->server()->scanner_manager()->LookupScanner(
+        scanner_id, proxy_->user_credentials().real_user(), &error_code, 
&junk).IsNotFound());
+    ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, error_code);
   }
 
   // Ensure that the metrics have been updated now that the scanner is 
unregistered.
@@ -3397,5 +3403,83 @@ TEST_F(TabletServerTest, 
TestKeysInRowsetMetadataPreventStartupSeeks) {
   restart_server_and_check_bytes_read(/*keys_in_rowset_meta=*/ true);
 }
 
+// Test that each scanner can only be accessed by the user who created it.
+TEST_F(TabletServerTest, TestScannerCheckMatchingUser) {
+  rpc::UserCredentials user;
+  user.set_real_user("good-guy");
+  proxy_->set_user_credentials(user);
+
+  InsertTestRowsDirect(0, 100);
+  ScanResponsePB resp;
+  NO_FATALS(OpenScannerWithAllColumns(&resp));
+  const string& scanner_id = resp.scanner_id();
+  ASSERT_TRUE(!scanner_id.empty());
+
+  // Now do a checksum scan as the user.
+  string checksum_scanner_id;
+  int64_t checksum_val;
+  {
+    ChecksumRequestPB checksum_req;
+    ChecksumResponsePB checksum_resp;
+    RpcController rpc;
+    ASSERT_OK(FillNewScanRequest(READ_LATEST, 
checksum_req.mutable_new_request()));
+    // Set a batch size of 0 so we don't return rows and can expect the scanner
+    // to remain alive.
+    checksum_req.set_batch_size_bytes(0);
+    ASSERT_OK(proxy_->Checksum(checksum_req, &checksum_resp, &rpc));
+    SCOPED_TRACE(checksum_resp.DebugString());
+    ASSERT_FALSE(checksum_resp.has_error());
+    ASSERT_TRUE(checksum_resp.has_more_results());
+    checksum_scanner_id = checksum_resp.scanner_id();
+    checksum_val = checksum_resp.checksum();
+  }
+
+  constexpr auto verify_authz_error = [] (const Status& s) {
+    EXPECT_TRUE(s.IsRemoteError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+  };
+
+  for (const string& other : { "", "bad-guy" }) {
+    TabletServerServiceProxy bad_proxy(
+        client_messenger_, mini_server_->bound_rpc_addr(),
+        mini_server_->bound_rpc_addr().host());
+    if (!other.empty()) {
+      rpc::UserCredentials other_user;
+      other_user.set_real_user(other);
+      bad_proxy.set_user_credentials(other_user);
+    }
+    // Other users and clients with no credentials will be bounced for scans,
+    // checksum scans, and keep-alive requests.
+    {
+      ScanRequestPB req;
+      RpcController rpc;
+      req.set_scanner_id(scanner_id);
+      Status s = bad_proxy.Scan(req, &resp, &rpc);
+      SCOPED_TRACE(resp.DebugString());
+      NO_FATALS(verify_authz_error(s));
+    }
+    {
+      ChecksumRequestPB req;
+      ContinueChecksumRequestPB* continue_req = req.mutable_continue_request();
+      continue_req->set_scanner_id(checksum_scanner_id);
+      continue_req->set_previous_checksum(checksum_val);
+      ChecksumResponsePB resp;
+      RpcController rpc;
+      Status s = bad_proxy.Checksum(req, &resp, &rpc);
+      SCOPED_TRACE(resp.DebugString());
+      NO_FATALS(verify_authz_error(s));
+    }
+    for (const string& id : { scanner_id, checksum_scanner_id }) {
+      ScannerKeepAliveRequestPB req;
+      req.set_scanner_id(id);
+      ScannerKeepAliveResponsePB resp;
+      RpcController rpc;
+      Status s = bad_proxy.ScannerKeepAlive(req, &resp, &rpc);
+      SCOPED_TRACE(resp.DebugString());
+      NO_FATALS(verify_authz_error(s));
+    }
+  }
+}
+
 } // namespace tserver
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index da4002c..9b3177e 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -61,6 +61,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_sidecar.h"
@@ -367,6 +368,12 @@ static void SetupErrorAndRespond(TabletServerErrorPB* 
error,
                                  const Status& s,
                                  TabletServerErrorPB::Code code,
                                  rpc::RpcContext* context) {
+  // Non-authorized errors will drop the connection.
+  if (code == TabletServerErrorPB::NOT_AUTHORIZED) {
+    DCHECK(s.IsNotAuthorized());
+    context->RespondRpcFailure(rpc::ErrorStatusPB::FATAL_UNAUTHORIZED, s);
+    return;
+  }
   // Generic "service unavailable" errors will cause the client to retry later.
   if ((code == TabletServerErrorPB::UNKNOWN_ERROR ||
        code == TabletServerErrorPB::THROTTLED) && s.IsServiceUnavailable()) {
@@ -1287,14 +1294,23 @@ void TabletServiceImpl::ScannerKeepAlive(const 
ScannerKeepAliveRequestPB *req,
                                          rpc::RpcContext *context) {
   DCHECK(req->has_scanner_id());
   SharedScanner scanner;
-  if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) 
{
-    resp->mutable_error()->set_code(TabletServerErrorPB::SCANNER_EXPIRED);
-    Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have 
expired)",
-                                           req->scanner_id()));
+  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
+  Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(),
+                                                       
context->remote_user().username(),
+                                                       &error_code,
+                                                       &scanner);
+  if (!s.ok()) {
     StatusToPB(s, resp->mutable_error()->mutable_status());
     LOG(INFO) << Substitute("ScannerKeepAlive: $0: remote=$1",
                             s.ToString(), context->requestor_string());
-    context->RespondSuccess();
+    if (PREDICT_TRUE(s.IsNotFound())) {
+      resp->mutable_error()->set_code(error_code);
+      StatusToPB(s, resp->mutable_error()->mutable_status());
+      context->RespondSuccess();
+      return;
+    }
+    DCHECK(s.IsNotAuthorized());
+    SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
     return;
   }
   scanner->UpdateAccessTime();
@@ -1545,7 +1561,7 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* 
req,
 
   ScanResultChecksummer collector;
   bool has_more = false;
-  TabletServerErrorPB::Code error_code;
+  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
   if (req->has_new_request()) {
     scan_req.mutable_new_scan_request()->CopyFrom(req->new_request());
     const NewScanRequestPB& new_req = req->new_request();
@@ -1814,7 +1830,7 @@ Status 
TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
 
   SharedScanner scanner;
   server_->scanner_manager()->NewScanner(replica,
-                                         rpc_context->requestor_string(),
+                                         rpc_context->remote_user(),
                                          scan_pb.row_format_flags(),
                                          &scanner);
   TRACE("Created scanner $0 for tablet $1", scanner->id(), 
scanner->tablet_id());
@@ -2043,17 +2059,19 @@ Status 
TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
   // in case multiple RPCs hit the same scanner at the same time. Probably
   // just a trylock and fail the RPC if it contends.
   SharedScanner scanner;
-  if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) 
{
-    if (batch_size_bytes == 0 && req->close_scanner()) {
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(),
+                                                       
rpc_context->remote_user().username(),
+                                                       &code,
+                                                       &scanner);
+  if (!s.ok()) {
+    if (s.IsNotFound() && batch_size_bytes == 0 && req->close_scanner()) {
       // Silently ignore any request to close a non-existent scanner.
       return Status::OK();
     }
-
-    *error_code = TabletServerErrorPB::SCANNER_EXPIRED;
-    Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have 
expired)",
-                                           req->scanner_id()));
     LOG(INFO) << Substitute("Scan: $0: call sequence id=$1, remote=$2",
                             s.ToString(), req->call_seq_id(), 
rpc_context->requestor_string());
+    *error_code = code;
     return s;
   }
 
@@ -2146,7 +2164,7 @@ Status TabletServiceImpl::HandleContinueScanRequest(const 
ScanRequestPB* req,
   scoped_refptr<TabletReplica> replica = scanner->tablet_replica();
   shared_ptr<Tablet> tablet;
   TabletServerErrorPB::Code tablet_ref_error_code;
-  const Status s = GetTabletRef(replica, &tablet, &tablet_ref_error_code);
+  s = GetTabletRef(replica, &tablet, &tablet_ref_error_code);
   // If the tablet is not running, but the scan operation in progress
   // has reached this point, the tablet server has the necessary data to
   // send in response for the scan continuation request.

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tserver.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index f9f8cef..afe1b71 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -96,6 +96,9 @@ message TabletServerErrorPB {
 
     // The tablet needs to be evicted and reassigned.
     TABLET_FAILED = 20;
+
+    // The request is disallowed for the given user.
+    NOT_AUTHORIZED = 21;
   }
 
   // The error code.

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tserver_path_handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver_path_handlers.cc 
b/src/kudu/tserver/tserver_path_handlers.cc
index 8fa0196..52ec87f 100644
--- a/src/kudu/tserver/tserver_path_handlers.cc
+++ b/src/kudu/tserver/tserver_path_handlers.cc
@@ -48,6 +48,7 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/server/webui_util.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet.h"
@@ -536,7 +537,7 @@ void ScanToJson(const ScanDescriptor& scan, EasyJson* json) 
{
   json->Set("scanner_id", scan.scanner_id);
   json->Set("state", ScanStateToString(scan.state));
   json->Set("query", ScanQueryHtml(scan));
-  json->Set("requestor", scan.requestor);
+  json->Set("requestor", scan.remote_user.username());
 
   json->Set("duration", 
HumanReadableElapsedTime::ToShortString(duration.ToSeconds()));
   json->Set("time_since_start",

Reply via email to