This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new adb886d3e fix(asio): fix the crash caused by early invalidated socket 
(#2111)
adb886d3e is described below

commit adb886d3e82676566afa6faabde3789b20ed120f
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Sep 11 11:54:27 2024 +0800

    fix(asio): fix the crash caused by early invalidated socket (#2111)
    
    Fix https://github.com/apache/incubator-pegasus/issues/307.
    
    This patch fixes the crash bug by lazily closing the socket after all socket
    references released to ensure multi-threads safety.
    
    This patch also includes some tidy-clang fixes.
---
 .clang-tidy                          |  2 +-
 build_tools/clang_tidy.py            |  2 +-
 src/runtime/rpc/asio_rpc_session.cpp | 21 +++++++++++++++------
 src/runtime/rpc/asio_rpc_session.h   |  9 +++++----
 src/runtime/rpc/network.cpp          |  8 +++++++-
 src/runtime/rpc/network.h            |  6 ++----
 src/runtime/rpc/network.sim.h        | 20 ++++++++++----------
 7 files changed, 41 insertions(+), 27 deletions(-)

diff --git a/.clang-tidy b/.clang-tidy
index 1cdfca281..20ca366c0 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -18,7 +18,7 @@
 # 
https://releases.llvm.org/14.0.0/tools/clang/tools/extra/docs/clang-tidy/index.html
 
 CheckOptions: []
-Checks: 
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-va
 [...]
+Checks: 
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-va
 [...]
 ExtraArgs:
 ExtraArgsBefore: []
 FormatStyle: none
diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py
index ed4f4d52a..2c3645d08 100755
--- a/build_tools/clang_tidy.py
+++ b/build_tools/clang_tidy.py
@@ -60,7 +60,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
                    "clang-tidy",
                    "-p0",
                    "-path", BUILD_PATH,
-                   
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-p
 [...]
+                   
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-p
 [...]
                    "-extra-arg=-language=c++",
                    "-extra-arg=-std=c++17",
                    "-extra-arg=-Ithirdparty/output/include"]
diff --git a/src/runtime/rpc/asio_rpc_session.cpp 
b/src/runtime/rpc/asio_rpc_session.cpp
index 39ac3e6c0..331d0253e 100644
--- a/src/runtime/rpc/asio_rpc_session.cpp
+++ b/src/runtime/rpc/asio_rpc_session.cpp
@@ -130,7 +130,7 @@ void asio_rpc_session::do_read(int read_next)
                 } else {
                     LOG_ERROR("asio read from {} failed: {}", _remote_addr, 
ec.message());
                 }
-                on_failure();
+                on_failure(false);
             } else {
                 _reader.mark_read(length);
 
@@ -151,7 +151,7 @@ void asio_rpc_session::do_read(int read_next)
 
                 if (read_next == -1) {
                     LOG_ERROR("asio read from {} failed", _remote_addr);
-                    on_failure();
+                    on_failure(false);
                 } else {
                     start_read_next(read_next);
                 }
@@ -197,16 +197,25 @@ asio_rpc_session::asio_rpc_session(asio_network_provider 
&net,
     set_options();
 }
 
-void asio_rpc_session::close()
+asio_rpc_session::~asio_rpc_session()
 {
+    // Because every async_* invoking adds the reference counter and releases 
the reference counter
+    // in corresponding callback, it's certain that the reference counter is 
zero in its
+    // destructor, which means there is no inflight invoking, then it's safe 
to close the socket.
+    asio_rpc_session::close();
+}
 
+void asio_rpc_session::close()
+{
     boost::system::error_code ec;
     _socket->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, 
ec);
-    if (ec)
+    if (ec) {
         LOG_WARNING("asio socket shutdown failed, error = {}", ec.message());
+    }
     _socket->close(ec);
-    if (ec)
+    if (ec) {
         LOG_WARNING("asio socket close failed, error = {}", ec.message());
+    }
 }
 
 void asio_rpc_session::connect()
@@ -222,7 +231,7 @@ void asio_rpc_session::connect()
 
                 set_options();
                 set_connected();
-                on_send_completed();
+                on_send_completed(0);
                 start_read_next();
             } else {
                 LOG_ERROR(
diff --git a/src/runtime/rpc/asio_rpc_session.h 
b/src/runtime/rpc/asio_rpc_session.h
index e3f5da4e2..7a7cda8b3 100644
--- a/src/runtime/rpc/asio_rpc_session.h
+++ b/src/runtime/rpc/asio_rpc_session.h
@@ -51,10 +51,14 @@ public:
                      message_parser_ptr &parser,
                      bool is_client);
 
-    ~asio_rpc_session() override = default;
+    ~asio_rpc_session() override;
 
     void send(uint64_t signature) override;
 
+    // The under layer socket will be invalidated after being closed.
+    //
+    // It's needed to prevent the '_socket' to be closed while the socket's 
async_* interfaces are
+    // in flight.
     void close() override;
 
     void connect() override;
@@ -69,9 +73,6 @@ private:
         }
     }
 
-private:
-    // boost::asio::socket is thread-unsafe, must use lock to prevent a
-    // reading/writing socket being modified or closed concurrently.
     std::shared_ptr<boost::asio::ip::tcp::socket> _socket;
 };
 
diff --git a/src/runtime/rpc/network.cpp b/src/runtime/rpc/network.cpp
index c572b19a8..28b6a731c 100644
--- a/src/runtime/rpc/network.cpp
+++ b/src/runtime/rpc/network.cpp
@@ -429,8 +429,14 @@ bool rpc_session::on_disconnected(bool is_write)
 
 void rpc_session::on_failure(bool is_write)
 {
+    // Just update the state machine here.
     if (on_disconnected(is_write)) {
-        close();
+        // The under layer socket may be used by async_* interfaces 
concurrently, it's not thread
+        // safe to invalidate the '_socket', it should be invalidated when the 
session is
+        // destroyed.
+        LOG_WARNING("disconnect to remote {}, the socket will be lazily closed 
when the session "
+                    "destroyed",
+                    _remote_addr);
     }
 }
 
diff --git a/src/runtime/rpc/network.h b/src/runtime/rpc/network.h
index 5a9bb0609..aca926743 100644
--- a/src/runtime/rpc/network.h
+++ b/src/runtime/rpc/network.h
@@ -274,8 +274,8 @@ public:
     // should always be called in lock
     bool unlink_message_for_send();
     virtual void send(uint64_t signature) = 0;
-    void on_send_completed(uint64_t signature = 0);
-    virtual void on_failure(bool is_write = false);
+    void on_send_completed(uint64_t signature);
+    virtual void on_failure(bool is_write);
 
 protected:
     ///
@@ -314,7 +314,6 @@ protected:
     uint64_t _message_sent;
     // ]
 
-protected:
     ///
     /// change status and check status
     ///
@@ -327,7 +326,6 @@ protected:
     void clear_send_queue(bool resend_msgs);
     bool on_disconnected(bool is_write);
 
-protected:
     // constant info
     connection_oriented_network &_net;
     dsn::rpc_address _remote_addr;
diff --git a/src/runtime/rpc/network.sim.h b/src/runtime/rpc/network.sim.h
index f7954afbf..1e2633344 100644
--- a/src/runtime/rpc/network.sim.h
+++ b/src/runtime/rpc/network.sim.h
@@ -50,15 +50,15 @@ public:
                        ::dsn::rpc_address remote_addr,
                        message_parser_ptr &parser);
 
-    virtual void connect();
+    void connect() override;
 
-    virtual void send(uint64_t signature) override;
+    void send(uint64_t signature) override;
 
-    virtual void do_read(int sz) override {}
+    void do_read(int sz) override {}
 
-    virtual void close() override {}
+    void close() override {}
 
-    virtual void on_failure(bool is_write = false) override {}
+    void on_failure(bool is_write) override {}
 };
 
 class sim_server_session : public rpc_session
@@ -69,15 +69,15 @@ public:
                        rpc_session_ptr &client,
                        message_parser_ptr &parser);
 
-    virtual void send(uint64_t signature) override;
+    void send(uint64_t signature) override;
 
-    virtual void connect() {}
+    void connect() override {}
 
-    virtual void do_read(int sz) override {}
+    void do_read(int sz) override {}
 
-    virtual void close() override {}
+    void close() override {}
 
-    virtual void on_failure(bool is_write = false) override {}
+    void on_failure(bool is_write) override {}
 
 private:
     rpc_session_ptr _client;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to