This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 64414e601 feat(Ranger): Use Apache Ranger for ACL when replica 
performs nfs copy (#1452)
64414e601 is described below

commit 64414e601169da169c425fdcbcb7948c019b6d0a
Author: WHBANG <[email protected]>
AuthorDate: Fri Apr 21 16:48:06 2023 +0800

    feat(Ranger): Use Apache Ranger for ACL when replica performs nfs copy 
(#1452)
    
    https://github.com/apache/incubator-pegasus/issues/1054
    
    This patch add ACL to the NFS copy of replica.
    
    1. Added `gpid` info to the data structure defined in `nfs.thrift`
    2. Perform ACL through the Ranger policy matched by `gpid`
    3. The registration of nfs is moved to `replica_stub.cpp`, and the original 
registration
        information is retained, which is convenient for testing
---
 src/nfs/nfs.thrift            |  2 ++
 src/nfs/nfs_client_impl.cpp   |  2 ++
 src/nfs/nfs_node.cpp          |  4 ++++
 src/nfs/nfs_node.h            | 18 +++++++++++++++++
 src/nfs/nfs_node_impl.cpp     | 25 +++++++++++++++++++++++-
 src/nfs/nfs_node_simple.h     | 15 ++++++++++++---
 src/nfs/nfs_server_impl.h     |  4 ++--
 src/nfs/test/main.cpp         |  6 ++++++
 src/replica/replica.cpp       |  5 ++---
 src/replica/replica.h         |  2 +-
 src/replica/replica_learn.cpp |  1 +
 src/replica/replica_stub.cpp  | 24 +++++++++++++++++++++++
 src/replica/replica_stub.h    | 45 +++++++++++++++++++++++++++++++++++++++----
 13 files changed, 139 insertions(+), 14 deletions(-)

diff --git a/src/nfs/nfs.thrift b/src/nfs/nfs.thrift
index f5c8245d8..3f0f96bbb 100644
--- a/src/nfs/nfs.thrift
+++ b/src/nfs/nfs.thrift
@@ -39,6 +39,7 @@ struct copy_request
     7: bool is_last;
     8: bool overwrite;
     9: optional string source_disk_tag;
+    10: optional dsn.gpid pid;
 }
 
 struct copy_response
@@ -58,6 +59,7 @@ struct get_file_size_request
     5: bool overwrite;
     6: optional string source_disk_tag;
     7: optional string dest_disk_tag;
+    8: optional dsn.gpid pid;
 }
 
 struct get_file_size_response
diff --git a/src/nfs/nfs_client_impl.cpp b/src/nfs/nfs_client_impl.cpp
index 9daefecd5..c0ced7b8c 100644
--- a/src/nfs/nfs_client_impl.cpp
+++ b/src/nfs/nfs_client_impl.cpp
@@ -138,6 +138,7 @@ void 
nfs_client_impl::begin_remote_copy(std::shared_ptr<remote_copy_request> &rc
     req->file_size_req.overwrite = rci->overwrite;
     req->file_size_req.__set_source_disk_tag(rci->source_disk_tag);
     req->file_size_req.__set_dest_disk_tag(rci->dest_disk_tag);
+    req->file_size_req.__set_pid(rci->pid);
     req->nfs_task = nfs_task;
     req->is_finished = false;
 
@@ -294,6 +295,7 @@ void nfs_client_impl::continue_copy()
                 copy_req.overwrite = ureq->file_size_req.overwrite;
                 copy_req.is_last = req->is_last;
                 
copy_req.__set_source_disk_tag(ureq->file_size_req.source_disk_tag);
+                copy_req.__set_pid(ureq->file_size_req.pid);
                 req->remote_copy_task =
                     async_nfs_copy(copy_req,
                                    [=](error_code err, copy_response &&resp) {
diff --git a/src/nfs/nfs_node.cpp b/src/nfs/nfs_node.cpp
index a5d30f039..0103b9a35 100644
--- a/src/nfs/nfs_node.cpp
+++ b/src/nfs/nfs_node.cpp
@@ -45,6 +45,7 @@ aio_task_ptr nfs_node::copy_remote_directory(const 
rpc_address &remote,
                                              const std::string &source_dir,
                                              const std::string &dest_disk_tag,
                                              const std::string &dest_dir,
+                                             const dsn::gpid &pid,
                                              bool overwrite,
                                              bool high_priority,
                                              task_code callback_code,
@@ -58,6 +59,7 @@ aio_task_ptr nfs_node::copy_remote_directory(const 
rpc_address &remote,
                              {},
                              dest_disk_tag,
                              dest_dir,
+                             pid,
                              overwrite,
                              high_priority,
                              callback_code,
@@ -72,6 +74,7 @@ aio_task_ptr nfs_node::copy_remote_files(const rpc_address 
&remote,
                                          const std::vector<std::string> &files,
                                          const std::string &dest_disk_tag,
                                          const std::string &dest_dir,
+                                         const dsn::gpid &pid,
                                          bool overwrite,
                                          bool high_priority,
                                          task_code callback_code,
@@ -88,6 +91,7 @@ aio_task_ptr nfs_node::copy_remote_files(const rpc_address 
&remote,
     rci->files = files;
     rci->dest_disk_tag = dest_disk_tag;
     rci->dest_dir = dest_dir;
+    rci->pid = pid;
     rci->overwrite = overwrite;
     rci->high_priority = high_priority;
     call(rci, cb);
diff --git a/src/nfs/nfs_node.h b/src/nfs/nfs_node.h
index 9f6949890..635562c66 100644
--- a/src/nfs/nfs_node.h
+++ b/src/nfs/nfs_node.h
@@ -31,6 +31,7 @@
 #include <vector>
 
 #include "aio/aio_task.h"
+#include "common/gpid.h"
 #include "runtime/api_task.h"
 #include "runtime/rpc/rpc_address.h"
 #include "runtime/task/task_code.h"
@@ -38,6 +39,14 @@
 
 namespace dsn {
 class task_tracker;
+namespace service {
+class copy_request;
+class copy_response;
+class get_file_size_request;
+class get_file_size_response;
+} // namespace service
+template <typename TResponse>
+class rpc_replier;
 
 struct remote_copy_request
 {
@@ -49,6 +58,7 @@ struct remote_copy_request
     std::string dest_dir;
     bool overwrite;
     bool high_priority;
+    dsn::gpid pid;
 };
 
 class nfs_node
@@ -62,6 +72,7 @@ public:
                                        const std::string &source_dir,
                                        const std::string &dest_disk_tag,
                                        const std::string &dest_dir,
+                                       const dsn::gpid &pid,
                                        bool overwrite,
                                        bool high_priority,
                                        task_code callback_code,
@@ -74,6 +85,7 @@ public:
                                    const std::vector<std::string> &files, // 
empty for all
                                    const std::string &dest_disk_tag,
                                    const std::string &dest_dir,
+                                   const dsn::gpid &pid,
                                    bool overwrite,
                                    bool high_priority,
                                    task_code callback_code,
@@ -91,6 +103,12 @@ public:
     virtual ~nfs_node() {}
     virtual error_code start() = 0;
     virtual error_code stop() = 0;
+    virtual void on_copy(const ::dsn::service::copy_request &request,
+                         ::dsn::rpc_replier<::dsn::service::copy_response> 
&reply) = 0;
+    virtual void
+    on_get_file_size(const ::dsn::service::get_file_size_request &request,
+                     
::dsn::rpc_replier<::dsn::service::get_file_size_response> &reply) = 0;
+    virtual void register_async_rpc_handler_for_test() = 0;
 
 protected:
     virtual void call(std::shared_ptr<remote_copy_request> rci, aio_task 
*callback) = 0;
diff --git a/src/nfs/nfs_node_impl.cpp b/src/nfs/nfs_node_impl.cpp
index 31123901d..387547fd8 100644
--- a/src/nfs/nfs_node_impl.cpp
+++ b/src/nfs/nfs_node_impl.cpp
@@ -43,8 +43,14 @@
 
 namespace dsn {
 class aio_task;
+template <typename TResponse>
+class rpc_replier;
 
 namespace service {
+class copy_request;
+class copy_response;
+class get_file_size_request;
+class get_file_size_response;
 
 nfs_node_simple::nfs_node_simple() : nfs_node()
 {
@@ -62,12 +68,16 @@ void 
nfs_node_simple::call(std::shared_ptr<remote_copy_request> rci, aio_task *c
 error_code nfs_node_simple::start()
 {
     _server = new nfs_service_impl();
-    _server->open_service();
 
     _client = new nfs_client_impl();
     return ERR_OK;
 }
 
+void nfs_node_simple::register_async_rpc_handler_for_test()
+{
+    _server->open_nfs_service_for_test();
+}
+
 error_code nfs_node_simple::stop()
 {
     delete _server;
@@ -78,5 +88,18 @@ error_code nfs_node_simple::stop()
 
     return ERR_OK;
 }
+
+void nfs_node_simple::on_copy(const copy_request &request, 
::dsn::rpc_replier<copy_response> &reply)
+{
+    _server->on_copy(request, reply);
+}
+
+void nfs_node_simple::on_get_file_size(
+    const ::dsn::service::get_file_size_request &request,
+    ::dsn::rpc_replier<::dsn::service::get_file_size_response> &reply)
+{
+    _server->on_get_file_size(request, reply);
+}
+
 } // namespace service
 } // namespace dsn
diff --git a/src/nfs/nfs_node_simple.h b/src/nfs/nfs_node_simple.h
index 2f078eab3..2376b1e34 100644
--- a/src/nfs/nfs_node_simple.h
+++ b/src/nfs/nfs_node_simple.h
@@ -50,11 +50,20 @@ public:
 
     virtual ~nfs_node_simple();
 
-    virtual void call(std::shared_ptr<remote_copy_request> rci, aio_task 
*callback) override;
+    void call(std::shared_ptr<remote_copy_request> rci, aio_task *callback) 
override;
 
-    virtual ::dsn::error_code start() override;
+    error_code start() override;
 
-    virtual error_code stop() override;
+    error_code stop() override;
+
+    void on_copy(const ::dsn::service::copy_request &request,
+                 ::dsn::rpc_replier<::dsn::service::copy_response> &reply) 
override;
+
+    void
+    on_get_file_size(const ::dsn::service::get_file_size_request &request,
+                     
::dsn::rpc_replier<::dsn::service::get_file_size_response> &reply) override;
+
+    void register_async_rpc_handler_for_test() override;
 
 private:
     nfs_service_impl *_server;
diff --git a/src/nfs/nfs_server_impl.h b/src/nfs/nfs_server_impl.h
index bdc83bdc9..9ba113404 100644
--- a/src/nfs/nfs_server_impl.h
+++ b/src/nfs/nfs_server_impl.h
@@ -56,7 +56,8 @@ public:
     nfs_service_impl();
     virtual ~nfs_service_impl() { _tracker.cancel_outstanding_tasks(); }
 
-    void open_service()
+    // The rpc_handler is actually registered replica_stub.cpp, which is saved 
here for testing
+    void open_nfs_service_for_test()
     {
         register_async_rpc_handler(RPC_NFS_COPY, "copy", 
&nfs_service_impl::on_copy);
         register_async_rpc_handler(
@@ -73,7 +74,6 @@ public:
         _nfs_max_send_rate_megabytes_cmd.reset();
     }
 
-protected:
     // RPC_NFS_V2_NFS_COPY
     virtual void on_copy(const copy_request &request, 
::dsn::rpc_replier<copy_response> &reply);
     // RPC_NFS_V2_NFS_GET_FILE_SIZE
diff --git a/src/nfs/test/main.cpp b/src/nfs/test/main.cpp
index c1b0ab546..445c4732f 100644
--- a/src/nfs/test/main.cpp
+++ b/src/nfs/test/main.cpp
@@ -34,6 +34,7 @@
 #include <vector>
 
 #include "aio/aio_task.h"
+#include "common/gpid.h"
 #include "nfs/nfs_node.h"
 #include "runtime/app_model.h"
 #include "runtime/rpc/rpc_address.h"
@@ -57,6 +58,8 @@ TEST(nfs, basic)
 {
     std::unique_ptr<dsn::nfs_node> nfs(dsn::nfs_node::create());
     nfs->start();
+    nfs->register_async_rpc_handler_for_test();
+    dsn::gpid fake_pid = gpid(1, 0);
 
     utils::filesystem::remove_path("nfs_test_dir");
     utils::filesystem::remove_path("nfs_test_dir_copy");
@@ -81,6 +84,7 @@ TEST(nfs, basic)
                                                      files,
                                                      "default",
                                                      "nfs_test_dir",
+                                                     fake_pid,
                                                      false,
                                                      false,
                                                      LPC_AIO_TEST_NFS,
@@ -122,6 +126,7 @@ TEST(nfs, basic)
                                                      files,
                                                      "default",
                                                      "nfs_test_dir",
+                                                     fake_pid,
                                                      true,
                                                      false,
                                                      LPC_AIO_TEST_NFS,
@@ -152,6 +157,7 @@ TEST(nfs, basic)
                                                          "nfs_test_dir",
                                                          "default",
                                                          "nfs_test_dir_copy",
+                                                         fake_pid,
                                                          false,
                                                          false,
                                                          LPC_AIO_TEST_NFS,
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index 5c423a7dc..7e130bf07 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -597,10 +597,9 @@ error_code replica::store_app_info(app_info &info, const 
std::string &path)
     return err;
 }
 
-bool replica::access_controller_allowed(message_ex *msg, ranger::access_type 
req_type) const
+bool replica::access_controller_allowed(message_ex *msg, const 
ranger::access_type &ac_type) const
 {
-    return !_access_controller->is_enable_ranger_acl() ||
-           _access_controller->allowed(msg, req_type);
+    return !_access_controller->is_enable_ranger_acl() || 
_access_controller->allowed(msg, ac_type);
 }
 
 } // namespace replication
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 17d2338e4..2e8108371 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -528,7 +528,7 @@ private:
     bool is_data_corrupted() const { return _data_corrupted; }
 
     // use Apache Ranger for replica access control
-    bool access_controller_allowed(message_ex *msg, ranger::access_type 
req_type) const;
+    bool access_controller_allowed(message_ex *msg, const ranger::access_type 
&ac_type) const;
 
 private:
     friend class ::dsn::replication::test::test_checker;
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index d3bbfe638..589e3e28d 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -912,6 +912,7 @@ void replica::on_learn_reply(error_code err, learn_request 
&&req, learn_response
             resp.state.files,
             get_replica_disk_tag(),
             learn_dir,
+            get_gpid(),
             true, // overwrite
             high_priority,
             LPC_REPLICATION_COPY_REMOTE_FILES,
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 5fc7bf439..d5cf1f0bc 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -62,6 +62,7 @@
 #include "mutation.h"
 #include "mutation_log.h"
 #include "nfs/nfs_node.h"
+#include "nfs_types.h"
 #include "perf_counter/perf_counter.h"
 #include "replica.h"
 #include "replica/duplication/replica_follower.h"
@@ -93,6 +94,7 @@
 #elif defined(DSN_USE_JEMALLOC)
 #include "utils/je_ctl.h"
 #endif
+#include "nfs/nfs_code_definition.h"
 #include "remote_cmd/remote_command.h"
 #include "utils/fail_point.h"
 
@@ -1257,6 +1259,23 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc)
     }
 }
 
+void replica_stub::on_nfs_copy(const ::dsn::service::copy_request &request,
+                               
::dsn::rpc_replier<::dsn::service::copy_response> &reply)
+{
+    if (check_status_and_authz_with_reply(request, reply, 
ranger::access_type::kWrite)) {
+        _nfs->on_copy(request, reply);
+    }
+}
+
+void replica_stub::on_nfs_get_file_size(
+    const ::dsn::service::get_file_size_request &request,
+    ::dsn::rpc_replier<::dsn::service::get_file_size_response> &reply)
+{
+    if (check_status_and_authz_with_reply(request, reply, 
ranger::access_type::kWrite)) {
+        _nfs->on_get_file_size(request, reply);
+    }
+}
+
 void replica_stub::on_prepare(dsn::message_ex *request)
 {
     gpid id;
@@ -2464,6 +2483,11 @@ void replica_stub::open_service()
     register_rpc_handler_with_rpc_holder(
         RPC_ADD_NEW_DISK, "add_new_disk", &replica_stub::on_add_new_disk);
 
+    // nfs
+    register_async_rpc_handler(dsn::service::RPC_NFS_COPY, "copy", 
&replica_stub::on_nfs_copy);
+    register_async_rpc_handler(
+        dsn::service::RPC_NFS_GET_FILE_SIZE, "get_file_size", 
&replica_stub::on_nfs_get_file_size);
+
     register_ctrl_command();
 }
 
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index edcbbca71..12f17a1cf 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -60,8 +60,10 @@
 #include "replica.h"
 #include "replica/mutation_log.h"
 #include "replica_admin_types.h"
+#include "runtime/ranger/access_type.h"
 #include "runtime/rpc/rpc_address.h"
 #include "runtime/rpc/rpc_holder.h"
+#include "runtime/security/access_controller.h"
 #include "runtime/serverlet.h"
 #include "runtime/task/task.h"
 #include "runtime/task/task_code.h"
@@ -75,10 +77,12 @@ namespace dsn {
 class command_deregister;
 class message_ex;
 class nfs_node;
-
-namespace security {
-class access_controller;
-} // namespace security
+namespace service {
+class copy_request;
+class copy_response;
+class get_file_size_request;
+class get_file_size_response;
+} // namespace service
 
 namespace replication {
 class configuration_query_by_node_response;
@@ -270,6 +274,39 @@ public:
 
     fs_manager *get_fs_manager() { return &_fs_manager; }
 
+    template <typename TReqType, typename TRespType>
+    bool check_status_and_authz_with_reply(const TReqType &request,
+                                           ::dsn::rpc_replier<TRespType> 
&reply,
+                                           const ::dsn::ranger::access_type 
&ac_type) const
+    {
+        if (!_access_controller->is_enable_ranger_acl()) {
+            return true;
+        }
+        const auto &pid = request.pid;
+        replica_ptr rep = get_replica(pid);
+
+        if (!rep) {
+            TRespType resp;
+            resp.error = ERR_OBJECT_NOT_FOUND;
+            reply(resp);
+            return false;
+        }
+        dsn::message_ex *msg = reply.response_message();
+        if (!rep->access_controller_allowed(msg, ac_type)) {
+            TRespType resp;
+            resp.error = ERR_ACL_DENY;
+            reply(resp);
+            return false;
+        }
+        return true;
+    }
+
+    void on_nfs_copy(const ::dsn::service::copy_request &request,
+                     ::dsn::rpc_replier<::dsn::service::copy_response> &reply);
+
+    void on_nfs_get_file_size(const ::dsn::service::get_file_size_request 
&request,
+                              
::dsn::rpc_replier<::dsn::service::get_file_size_response> &reply);
+
 private:
     enum replica_node_state
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to