chenBright commented on code in PR #3199:
URL: https://github.com/apache/brpc/pull/3199#discussion_r2726253108


##########
src/brpc/socket.cpp:
##########
@@ -2264,28 +2183,15 @@ int Socket::OnInputEvent(void* user_data, uint32_t 
events,
         // is just 1500~1700/s
         g_vars->neventthread << 1;
 
-        bthread_t tid;
         // transfer ownership as well, don't use s anymore!
-        Socket* const p = s.release();
+        Socket *const p = s.release();

Review Comment:
   No need to modify.



##########
src/brpc/rdma_transport.cpp:
##########
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include "brpc/rdma_transport.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma/rdma_endpoint.h"
+#include "brpc/rdma/rdma_helper.h"
+
+namespace brpc {
+DECLARE_bool(usercode_in_coroutine);
+DECLARE_bool(usercode_in_pthread);
+
+extern SocketVarsCollector *g_vars;
+
+void RdmaTransport::Init(Socket *socket, const SocketOptions &options) {
+    CHECK(_rdma_ep == NULL);
+    if (options.socket_mode == SOCKET_MODE_RDMA) {
+        _rdma_ep = new(std::nothrow)rdma::RdmaEndpoint(socket);
+        if (!_rdma_ep) {
+            const int saved_errno = errno;
+            PLOG(ERROR) << "Fail to create RdmaEndpoint";
+            socket->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
+                                                  berror(saved_errno));
+        }
+        _rdma_state = RDMA_UNKNOWN;
+    } else {
+        _rdma_state = RDMA_OFF;
+        socket->_socket_mode = SOCKET_MODE_TCP;
+    }
+    _socket = socket;
+    _default_connect = options.app_connect;
+    _on_edge_trigger = options.on_edge_triggered_events;
+    if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+        _on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp;
+    }
+    _tcp_transport = std::make_shared<TcpTransport>();
+    _tcp_transport->Init(socket, options);
+}
+
+void RdmaTransport::Release() {
+    if (_rdma_ep) {
+        delete _rdma_ep;
+        _rdma_ep = NULL;
+        _rdma_state = RDMA_UNKNOWN;
+    }
+}
+
+int RdmaTransport::Reset(int32_t expected_nref) {
+    if (_rdma_ep) {
+        _rdma_ep->Reset();
+        _rdma_state = RDMA_UNKNOWN;
+    }
+    return 0;
+}
+
+std::shared_ptr<AppConnect> RdmaTransport::Connect() {
+    if (_default_connect == nullptr) {
+        return  std::make_shared<rdma::RdmaConnect>();
+    }
+    return _default_connect;
+}
+
+int RdmaTransport::CutFromIOBuf(butil::IOBuf *buf) {
+    if (_rdma_ep && _rdma_state != RDMA_OFF) {
+        butil::IOBuf *data_arr[1] = {buf};
+        return _rdma_ep->CutFromIOBufList(data_arr, 1);
+    } else {
+        return _tcp_transport->CutFromIOBuf(buf);
+    }
+}
+
+ssize_t RdmaTransport::CutFromIOBufList(butil::IOBuf **buf, size_t ndata) {
+    if (_rdma_ep && _rdma_state != RDMA_OFF) {
+        return _rdma_ep->CutFromIOBufList(buf, ndata);
+    }
+    return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), buf, 
ndata);

Review Comment:
   _tcp_transport->CutFromIOBufList?



##########
src/brpc/rdma_transport.cpp:
##########
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include "brpc/rdma_transport.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma/rdma_endpoint.h"
+#include "brpc/rdma/rdma_helper.h"
+
+namespace brpc {
+DECLARE_bool(usercode_in_coroutine);
+DECLARE_bool(usercode_in_pthread);
+
+extern SocketVarsCollector *g_vars;
+
+void RdmaTransport::Init(Socket *socket, const SocketOptions &options) {
+    CHECK(_rdma_ep == NULL);
+    if (options.socket_mode == SOCKET_MODE_RDMA) {
+        _rdma_ep = new(std::nothrow)rdma::RdmaEndpoint(socket);
+        if (!_rdma_ep) {
+            const int saved_errno = errno;
+            PLOG(ERROR) << "Fail to create RdmaEndpoint";
+            socket->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
+                                                  berror(saved_errno));
+        }
+        _rdma_state = RDMA_UNKNOWN;
+    } else {
+        _rdma_state = RDMA_OFF;
+        socket->_socket_mode = SOCKET_MODE_TCP;
+    }
+    _socket = socket;
+    _default_connect = options.app_connect;
+    _on_edge_trigger = options.on_edge_triggered_events;
+    if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+        _on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp;
+    }
+    _tcp_transport = std::make_shared<TcpTransport>();
+    _tcp_transport->Init(socket, options);
+}
+
+void RdmaTransport::Release() {
+    if (_rdma_ep) {
+        delete _rdma_ep;
+        _rdma_ep = NULL;
+        _rdma_state = RDMA_UNKNOWN;
+    }
+}
+
+int RdmaTransport::Reset(int32_t expected_nref) {
+    if (_rdma_ep) {
+        _rdma_ep->Reset();
+        _rdma_state = RDMA_UNKNOWN;
+    }
+    return 0;
+}
+
+std::shared_ptr<AppConnect> RdmaTransport::Connect() {
+    if (_default_connect == nullptr) {
+        return  std::make_shared<rdma::RdmaConnect>();
+    }
+    return _default_connect;
+}
+
+int RdmaTransport::CutFromIOBuf(butil::IOBuf *buf) {
+    if (_rdma_ep && _rdma_state != RDMA_OFF) {
+        butil::IOBuf *data_arr[1] = {buf};
+        return _rdma_ep->CutFromIOBufList(data_arr, 1);
+    } else {
+        return _tcp_transport->CutFromIOBuf(buf);
+    }
+}
+
+ssize_t RdmaTransport::CutFromIOBufList(butil::IOBuf **buf, size_t ndata) {
+    if (_rdma_ep && _rdma_state != RDMA_OFF) {
+        return _rdma_ep->CutFromIOBufList(buf, ndata);
+    }
+    return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), buf, 
ndata);
+}
+
+int RdmaTransport::WaitEpollOut(butil::atomic<int> *_epollout_butex,
+                                    bool pollin, const timespec duetime) {
+    if (_rdma_state == RDMA_ON) {
+        const int expected_val = _epollout_butex
+                ->load(butil::memory_order_acquire);
+        CHECK(_rdma_ep != NULL);
+        if (!_rdma_ep->IsWritable()) {
+            g_vars->nwaitepollout << 1;
+            if (bthread::butex_wait(_epollout_butex, expected_val, &duetime) < 
0) {
+                if (errno != EAGAIN && errno != ETIMEDOUT) {
+                    const int saved_errno = errno;
+                    PLOG(WARNING) << "Fail to wait rdma window of " << _socket;
+                    _socket->SetFailed(saved_errno, "Fail to wait rdma window 
of %s: %s",
+                                                             
_socket->description().c_str(), berror(saved_errno));

Review Comment:
   This line needs to be formatted.



##########
src/brpc/socket_mode.h:
##########
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_COMMON_H
+#define BRPC_COMMON_H

Review Comment:
   The name of Header Guard also needs to be changed.



##########
src/brpc/transport_factory.cpp:
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "transport_factory.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma_transport.h"
+namespace brpc {
+int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot, 
const void* _options) {
+    if (mode == SOCKET_MODE_TCP) {
+        return 0;
+    }
+#if BRPC_WITH_RDMA
+    else if (mode == SOCKET_MODE_RDMA) {
+        return RdmaTransport::ContextInitOrDie(serverOrNot, _options);
+    }
+#endif
+    else {
+        LOG(ERROR) << "unknown transport type  " << mode;
+        return 1;
+    }
+}
+
+std::shared_ptr<Transport> TransportFactory::CreateTransport(SocketMode mode) {

Review Comment:
   The lifecycle of a transport should be fairly clear, so is it necessary to 
use shared_ptr?



##########
src/brpc/transport.h:
##########
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_TRANSPORT_H
+#define BRPC_TRANSPORT_H
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "server.h"
+
+namespace brpc {
+using OnEdgeTrigger = std::function<void (Socket*)>;
+class Transport {
+    friend class TransportFactory;
+public:
+    static void* OnEdge(void* arg) {
+        // the enclosed Socket is valid and free to access inside this 
function.
+        SocketUniquePtr s(static_cast<Socket*>(arg));
+        const OnEdgeTrigger on_edge_trigger = 
s->_transport->GetOnEdgeTrigger();
+        on_edge_trigger(s.get());
+        return NULL;
+    }
+
+    static void* ProcessInputMessage(void* void_arg) {
+        InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);
+        msg->_process(msg);
+        return NULL;
+    }
+    virtual ~Transport() = default;
+    virtual void Init(Socket* socket, const SocketOptions& options) = 0;
+    virtual void Release() = 0;
+    virtual int Reset(int32_t expected_nref) = 0;
+    virtual std::shared_ptr<AppConnect> Connect() = 0;
+    virtual int CutFromIOBuf(butil::IOBuf* buf) = 0;
+    virtual ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) = 0;
+    virtual int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, 
const timespec duetime) = 0;
+    virtual void ProcessEvent(bthread_attr_t attr) = 0;
+    virtual void QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) = 0;

Review Comment:
   The parameters of these interfaces are based only on the current 
implementation, which has significant limitations and cannot be extended. Using 
structs might offer better extensibility.
   
   For example, the CutFromIOBufList interface does not provide post send type 
information, making it impossible to implement RDMA one-side operations.



##########
src/brpc/tcp_transport.cpp:
##########
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "tcp_transport.h"
+namespace brpc {
+DECLARE_bool(usercode_in_coroutine);
+DECLARE_bool(usercode_in_pthread);
+
+extern SocketVarsCollector* g_vars;
+
+void TcpTransport::Init(Socket* socket, const SocketOptions& options) {
+    _socket = socket;
+    _default_connect = options.app_connect;
+    _on_edge_trigger = options.on_edge_triggered_events;
+    if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+        _on_edge_trigger = InputMessenger::OnNewMessages;
+    }
+}
+
+void TcpTransport::Release(){}
+
+int TcpTransport::Reset(int32_t expected_nref) {
+    return 0;
+}
+
+int TcpTransport::CutFromIOBuf(butil::IOBuf* buf) {
+    return buf->cut_into_file_descriptor(_socket->fd());
+}
+
+std::shared_ptr<AppConnect> TcpTransport::Connect() {
+    return _default_connect;
+}
+
+ssize_t TcpTransport::CutFromIOBufList(butil::IOBuf** buf, size_t ndata) {
+    return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), buf, 
ndata);
+}
+
+int TcpTransport::WaitEpollOut(butil::atomic<int>* _epollout_butex, bool 
pollin, const timespec duetime) {
+    g_vars->nwaitepollout << 1;
+    const int rc = _socket->WaitEpollOut(_socket->fd(), pollin, &duetime);
+    if (rc < 0 && errno != ETIMEDOUT) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to wait epollout of " << _socket;
+        _socket->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
+                                         _socket->description().c_str(), 
berror(saved_errno));
+        return 1;
+    }
+    return 0;
+}
+
+void TcpTransport::ProcessEvent(bthread_attr_t attr) {
+    bthread_t tid;
+    if (FLAGS_usercode_in_coroutine) {
+        OnEdge(_socket);
+    } else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) {
+        LOG(FATAL) << "Fail to start ProcessEvent";
+        OnEdge(_socket);
+    }
+}
+void TcpTransport::QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) {

Review Comment:
   It appears that last_msg is not being used.



##########
src/brpc/transport_factory.cpp:
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "transport_factory.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma_transport.h"
+namespace brpc {
+int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot, 
const void* _options) {
+    if (mode == SOCKET_MODE_TCP) {
+        return 0;
+    }
+#if BRPC_WITH_RDMA
+    else if (mode == SOCKET_MODE_RDMA) {
+        return RdmaTransport::ContextInitOrDie(serverOrNot, _options);
+    }
+#endif
+    else {
+        LOG(ERROR) << "unknown transport type  " << mode;
+        return 1;
+    }
+}
+
+std::shared_ptr<Transport> TransportFactory::CreateTransport(SocketMode mode) {
+    if (mode == SOCKET_MODE_TCP) {
+        // 使用共享指针创建对象

Review Comment:
   English?



##########
src/brpc/transport_factory.h:
##########
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_TRANSPORT_FACTORY_H
+#define BRPC_TRANSPORT_FACTORY_H
+
+#include "brpc/errno.pb.h"
+#include "brpc/socket_mode.h"
+#include "brpc/transport.h"
+
+#if BRPC_WITH_RDMA
+BAIDU_REGISTER_ERRNO(brpc::ERDMA, "RDMA verbs error");
+BAIDU_REGISTER_ERRNO(brpc::ERDMAMEM, "Memory not registered for RDMA");
+#endif
+
+namespace brpc {
+// TransportFactory to create transport instance with socket_mode {TCP, RDMA}
+class TransportFactory {

Review Comment:
   Is it necessary for TransportFactory to be a class since it only has static 
functions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to