tqchen commented on a change in pull request #4281: [RUTNIME] Support C++ RPC
URL: https://github.com/apache/incubator-tvm/pull/4281#discussion_r344453875
 
 

 ##########
 File path: apps/cpp_rpc/rpc_server.cc
 ##########
 @@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*!
+ * \file rpc_server.cc
+ * \brief RPC Server implementation.
+ */
+#include <tvm/runtime/registry.h>
+
+#if defined(__linux__) || defined(__ANDROID__)
+#include <sys/select.h>
+#include <sys/wait.h>
+#endif
+#include <set>
+#include <iostream>
+#include <future>
+#include <thread>
+#include <chrono>
+#include <string>
+
+#include "rpc_server.h"
+#include "rpc_env.h"
+#include "rpc_tracker_client.h"
+#include "../../src/runtime/rpc/rpc_session.h"
+#include "../../src/runtime/rpc/rpc_socket_impl.h"
+#include "../../src/common/socket.h"
+
+namespace tvm {
+namespace runtime {
+
+#if defined(__linux__) || defined(__ANDROID__)
+static pid_t waitPidEintr(int *status) {
+  pid_t pid = 0;
+  while ((pid = waitpid(-1, status, 0)) == -1) {
+    if (errno == EINTR) {
+      continue;
+    } else {
+      perror("waitpid");
+      abort();
+    }
+  }
+  return pid;
+}
+#endif
+
+/*!
+ * \brief RPCServer RPC Server class.
+ * \param host The hostname of the server, Default=0.0.0.0
+ * \param port The port of the RPC, Default=9090
+ * \param port_end The end search port of the RPC, Default=9199
+ * \param tracker The address of RPC tracker in host:port format e.g. 
10.77.1.234:9190 Default=""
+ * \param key The key used to identify the device type in tracker. Default=""
+ * \param custom_addr Custom IP Address to Report to RPC Tracker. Default=""
+ * \param is_proxy Whether to run in proxy mode. Default=False
+ */
+class RPCServer {
+ public:
+  /*!
+   * \brief Constructor.
+  */
+  RPCServer(const std::string &host,
+            int port,
+            int port_end,
+            const std::string &tracker_addr,
+            const std::string &key,
+            const std::string &custom_addr,
+            bool is_proxy) {
+    // Init the values
+    host_ = host;
+    port_ = port;
+    port_end_ = port_end;
+    tracker_addr_ = tracker_addr;
+    key_ = key;
+    custom_addr_ = custom_addr;
+    is_proxy_ = is_proxy;
+  }
+
+  /*!
+   * \brief Destructor.
+  */
+  ~RPCServer() {
+    // Free the resources
+    tracker_sock_.Close();
+    listen_sock_.Close();
+  }
+
+  /*!
+   * \brief Start Creates the RPC listen process and execution.
+  */
+  void Start() {
+      listen_sock_.Create();
+      my_port_ = listen_sock_.TryBindHost(host_, port_, port_end_);
+      LOG(INFO) << "bind to " << host_ << ":" << my_port_;
+      listen_sock_.Listen(1);
+      std::future<void> proc(std::async(std::launch::async, 
&RPCServer::ListenLoopProc, this));
+      proc.get();
+      // Close the listen socket
+      listen_sock_.Close();
+  }
+
+ private:
+  /*!
+   * \brief ListenLoopProc The listen process.
+   */
+  void ListenLoopProc() {
+    TrackerClient tracker(tracker_addr_, key_, custom_addr_);
+    while (true) {
+      common::TCPSocket conn;
+      common::SockAddr addr("0.0.0.0", 0);
+      std::string opts;
+      try {
+        // step 1: setup tracker and report to tracker
+        tracker.TryConnect();
+        // step 2: wait for in-coming connections
+        AcceptConnection(&tracker, &conn, &addr, &opts);
+      }
+      catch (const char* msg) {
+        LOG(WARNING) << "Socket exception: " << msg;
+        // close tracker resource
+        tracker.Close();
+        continue;
+      }
+      catch (std::exception& e) {
+        // Other errors
+        LOG(WARNING) << "Exception standard: " << e.what();
+        continue;
+      }
+
+      int timeout = GetTimeOutFromOpts(opts);
+      #if defined(__linux__) || defined(__ANDROID__)
+        // step 3: serving
+        if (timeout) {
+          const pid_t timer_pid = fork();
+          if (timer_pid == 0) {
+            // Timer process
+            sleep(timeout);
+            exit(0);
+          }
+
+          const pid_t worker_pid = fork();
+          if (worker_pid == 0) {
+            // Worker process
+            ServerLoopProc(conn, addr);
+            exit(0);
+          }
+
+          int status = 0;
+          const pid_t finished_first = waitPidEintr(&status);
+          if (finished_first == timer_pid) {
+            kill(worker_pid, SIGKILL);
+          } else if (finished_first == worker_pid) {
+            kill(timer_pid, SIGKILL);
+          } else {
+            LOG(INFO) << "Child pid=" << finished_first << " unexpected, but 
still continue.";
+          }
+
+          int status_second = 0;
+          waitPidEintr(&status_second);
+
+          // Logging.
+          if (finished_first == timer_pid) {
+            LOG(INFO) << "Child pid=" << worker_pid << " killed (timeout = " 
<< timeout
+                      << "), Process status =" << status_second;
+          } else if (finished_first == worker_pid) {
+            LOG(INFO) << "Child pid=" << timer_pid << " killed, Process status 
=" << status_second;
+          }
+        } else {
+          auto pid = fork();
+          if (pid == 0) {
+            ServerLoopProc(conn, addr);
+            exit(0);
+          }
+          // Wait for the result
+          int status = 0;
+          wait(&status);
+          LOG(INFO) << "Child pid=" << pid << " exited, Process status =" << 
status;
+        }
+      #else
+        // step 3: serving
+        std::future<void> proc(std::async(std::launch::async,
+                                          &RPCServer::ServerLoopProc, this, 
conn, addr));
+        // wait until server process finish or timeout
+        if (timeout) {
 
 Review comment:
   timeout !=0

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to