This is an automated email from the ASF dual-hosted git repository.
tqchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tvm.git
The following commit(s) were added to refs/heads/main by this push:
new 9831966c8e [CPP_RPC] Bugfix race conditions and enhance print infos
(#19778)
9831966c8e is described below
commit 9831966c8e50cee80bed156b3f2520e56ce7e380
Author: Balint Cristian <[email protected]>
AuthorDate: Mon Jun 15 14:45:26 2026 +0300
[CPP_RPC] Bugfix race conditions and enhance print infos (#19778)
This fixes several race conditions and enhances printing infos:
* Protect against race condition if ```list``` or ```remove``` on
nonexistent filepath is attempted
* ```ListDir``` returns gracefully when folder nonexistent without
exception throwed (blocking)
* ~Make sure ```TVM_FFI_THROW``` also ```_exit()```, not blocking the
child (until timeout)~
* Enhance printed info during ```tvm_rpc``` with concise & trackable
status updates
* Print spent time in the rpc session, may help users setting the
timeout to a safer boundary
This enhanches significantly the speed of remote tuning, previously
stalled due to timeout conditions.
---
#### Issue description:
If worker child process timeouts it gets killed (expected, based on
timeout) leaves with cache file leftovers. Thats why when a new session
starts it will do a cleanup first. But when previous child process
finished normally (means also cleaned up itself), the new cleanup
process will hang due to throwed exception (blocking) over the
"nonexistent directory" error !
In this vicious circle we always (almost) will have hanged child workers
that needs to be killed based on a timeout.
---
#### Before
* Child worker never (mostly) exited on its own, it was always killed
(on timeout):
```
[02:52:48] {..}/rpc_server.cc:310: Connection success 192.168.1.2:39460
[02:52:48] {..}/rpc_env.cc:149: Load module from
/tmp//.cache/tvm_tmp_mod.tar.so ...
[02:52:50] {..}/rpc_server.cc:208: Child pid=3727459 killed (timeout = 2),
Process status = 15
[02:52:50] {..}/rpc_server.cc:239: Socket Connection Closed
```
#### After
* Based on child condition and its exit we have:
Worker child exits on its own (finish normally, no timeout):
```
[11:58:27] {..}/rpc_server.cc:312: New session from 192.168.1.2:56030
[11:58:27] {..}/rpc_env.cc:123: Load module from
/tmp//.cache/tvm_tmp_mod.tar.so ...
[11:58:28] {..}/rpc_server.cc:337: Finished serving 192.168.1.2:56030
after 1.028 sec
[11:58:28] {..}/rpc_server.cc:212: Child pid=616766 finished, status = 0
[11:58:28] {..}/rpc_server.cc:241: End session with 192.168.1.2:56030
```
Worker child is terminated (unfinished task, due to timeout):
```
[11:23:11] {..}/rpc_server.cc:312: New session from 192.168.1.2:58152
[11:23:11] {..}/rpc_env.cc:123: Load module from
/tmp//.cache/tvm_tmp_mod.tar.so ...
[11:29:26] {..}/rpc_server.cc:208: Child pid=414284 killed (timeout = 15
sec), status = 15
[11:29:26] {..}/rpc_server.cc:241: End session with 192.168.1.2:58152
```
---
apps/cpp_rpc/rpc_env.cc | 53 ++++++++++++++++++++-------------------
apps/cpp_rpc/rpc_env.h | 33 ++++++++++++++++++++++++
apps/cpp_rpc/rpc_server.cc | 24 +++++++++++-------
apps/cpp_rpc/rpc_tracker_client.h | 2 +-
4 files changed, 76 insertions(+), 36 deletions(-)
diff --git a/apps/cpp_rpc/rpc_env.cc b/apps/cpp_rpc/rpc_env.cc
index cc2c5f798a..b0b1fe4064 100644
--- a/apps/cpp_rpc/rpc_env.cc
+++ b/apps/cpp_rpc/rpc_env.cc
@@ -68,32 +68,6 @@ std::string GenerateUntarCommand(const std::string&
tar_file, const std::string&
namespace tvm {
namespace runtime {
-/*!
- * \brief CleanDir Removes the files from the directory
- * \param dirname THe name of the directory
- */
-void CleanDir(const std::string& dirname);
-
-/*!
- * \brief ListDir get the list of files in a directory
- * \param dirname The root directory name
- * \return vector Files in directory.
- */
-std::vector<std::string> ListDir(const std::string& dirname);
-
-/*!
- * \brief build a shared library if necessary
- *
- * This function will automatically call
- * cc.create_shared if the path is in format .o or .tar
- * High level handling for .o and .tar file.
- * We support this to be consistent with RPC module load.
- * \param file_in The input file path.
- *
- * \return The name of the shared library.
- */
-std::string BuildSharedLibrary(std::string file_in);
-
RPCEnv::RPCEnv(const std::string& wd) {
if (wd != "") {
base_ = wd + "/.cache";
@@ -167,6 +141,7 @@ RPCEnv::RPCEnv(const std::string& wd) {
return ffi::Bytes(bin);
}));
}
+
/*!
* \brief GetPath To get the work path from packed function
* \param file_name The file name
@@ -177,11 +152,14 @@ std::string RPCEnv::GetPath(const std::string& file_name)
const {
// and does not create /.rpc/
return !file_name.empty() && file_name[0] == '/' ? file_name : base_ + "/" +
file_name;
}
+
/*!
* \brief Remove The RPC Environment cleanup function
*/
void RPCEnv::CleanUp() const {
CleanDir(base_);
+ if (!CheckPath(base_))
+ return;
const int ret = rmdir(base_.c_str());
if (ret != 0) {
LOG(WARNING) << "Remove directory " << base_ << " failed";
@@ -199,6 +177,9 @@ std::vector<std::string> ListDir(const std::string&
dirname) {
DIR* dp = opendir(dirname.c_str());
if (dp == nullptr) {
int errsv = errno;
+ if (errsv == ENOENT) {
+ return vec;
+ }
TVM_FFI_THROW(InternalError) << "ListDir " << dirname << " error: " <<
strerror(errsv);
}
dirent* d;
@@ -220,6 +201,9 @@ std::vector<std::string> ListDir(const std::string&
dirname) {
HANDLE handle = FindFirstFileA(pattern.c_str(), &fd);
if (handle == INVALID_HANDLE_VALUE) {
const int errsv = GetLastError();
+ if (errsv == ERROR_FILE_NOT_FOUND || errsv == ERROR_PATH_NOT_FOUND) {
+ return vec;
+ }
TVM_FFI_THROW(InternalError) << "ListDir " << dirname << " error: " <<
strerror(errsv);
}
do {
@@ -334,11 +318,28 @@ std::string BuildSharedLibrary(std::string file) {
return file_name;
}
+/*!
+ * \brief CheckPath Checks file or directory if exists
+ * \param dirname The name of the directory
+ * \return True if path exists.
+ */
+bool CheckPath(const std::string& pathname) {
+#if defined(_WIN32)
+ DWORD attribs = GetFileAttributesA(pathname.c_str());
+ return (attribs != INVALID_FILE_ATTRIBUTES);
+#else
+ struct stat info;
+ return (stat(pathname.c_str(), &info) == 0);
+#endif
+}
+
/*!
* \brief CleanDir Removes the files from the directory
* \param dirname The name of the directory
*/
void CleanDir(const std::string& dirname) {
+ if (!CheckPath(dirname))
+ return;
auto files = ListDir(dirname);
for (const auto& filename : files) {
std::string file_path = dirname + "/";
diff --git a/apps/cpp_rpc/rpc_env.h b/apps/cpp_rpc/rpc_env.h
index a5d3f6957c..bd5e2f9422 100644
--- a/apps/cpp_rpc/rpc_env.h
+++ b/apps/cpp_rpc/rpc_env.h
@@ -31,6 +31,39 @@
namespace tvm {
namespace runtime {
+/*!
+ * \brief CheckPath Checks if file or directory exists
+ * \param dirname The name of the directory
+ * \return True if path exists.
+ */
+bool CheckPath(const std::string& pathname);
+
+/*!
+ * \brief CleanDir Removes the files from the directory
+ * \param dirname The name of the directory
+ */
+void CleanDir(const std::string& dirname);
+
+/*!
+ * \brief ListDir Get the list of files in a directory
+ * \param dirname The root directory name
+ * \return vector Files in directory.
+ */
+std::vector<std::string> ListDir(const std::string& dirname);
+
+/*!
+ * \brief build a shared library if necessary
+ *
+ * This function will automatically call
+ * cc.create_shared if the path is in format .o or .tar
+ * High level handling for .o and .tar file.
+ * We support this to be consistent with RPC module load.
+ * \param file_in The input file path.
+ *
+ * \return The name of the shared library.
+ */
+std::string BuildSharedLibrary(std::string file_in);
+
/*!
* \brief RPCEnv The RPC Environment parameters for c++ rpc server
*/
diff --git a/apps/cpp_rpc/rpc_server.cc b/apps/cpp_rpc/rpc_server.cc
index 9425e734c3..b601478f43 100644
--- a/apps/cpp_rpc/rpc_server.cc
+++ b/apps/cpp_rpc/rpc_server.cc
@@ -190,8 +190,8 @@ class RPCServer {
_exit(0);
}
- int status = 0;
- const pid_t finished_first = waitPidEintr(&status);
+ int status_first = 0;
+ const pid_t finished_first = waitPidEintr(&status_first);
if (finished_first == timer_pid) {
kill(worker_pid, SIGTERM);
} else if (finished_first == worker_pid) {
@@ -205,10 +205,12 @@ class RPCServer {
// Logging.
if (finished_first == timer_pid) {
- LOG(INFO) << "Child pid=" << worker_pid << " killed (timeout = " <<
timeout
- << "), Process status = " << status_second;
+ LOG(INFO) << "Child pid=" << worker_pid << " killed"
+ << " (timeout = " << timeout << " sec)"
+ << ", status = " << status_second;
} else if (finished_first == worker_pid) {
- LOG(INFO) << "Child pid=" << timer_pid << " killed, Process status =
" << status_second;
+ LOG(INFO) << "Child pid=" << worker_pid << " finished"
+ << ", status = "<< status_first;
}
} else {
auto pid = fork();
@@ -219,7 +221,7 @@ class RPCServer {
// Wait for the result
int status = 0;
wait(&status);
- LOG(INFO) << "Child pid=" << pid << " exited, Process status =" <<
status;
+ LOG(INFO) << "Child pid=" << pid << " exited, status =" << status;
}
#elif defined(WIN32)
auto start_time = high_resolution_clock::now();
@@ -236,7 +238,7 @@ class RPCServer {
ServerLoopProc(conn, addr, work_dir_);
#endif
// close from our side.
- LOG(INFO) << "Socket Connection Closed";
+ LOG(INFO) << "End session with " << addr.AsString();
conn.Close();
}
}
@@ -307,7 +309,7 @@ class RPCServer {
keylen = int(server_key.length());
TVM_FFI_ICHECK_EQ(conn.SendAll(&keylen, sizeof(keylen)),
sizeof(keylen));
TVM_FFI_ICHECK_EQ(conn.SendAll(server_key.c_str(), keylen), keylen);
- LOG(INFO) << "Connection success " << addr->AsString();
+ LOG(INFO) << "New session from " << addr->AsString();
#ifndef __ANDROID__
ssin >> *opts;
#else
@@ -327,9 +329,13 @@ class RPCServer {
static void ServerLoopProc(support::TCPSocket sock, support::SockAddr addr,
std::string work_dir) {
// Server loop
+ const auto s_time = std::chrono::high_resolution_clock::now();
const auto env = RPCEnv(work_dir);
RPCServerLoop(int(sock.sockfd));
- LOG(INFO) << "Finish serving " << addr.AsString();
+ const auto e_time = std::chrono::high_resolution_clock::now();
+ std::chrono::duration<double> elapsed = e_time - s_time;
+ LOG(INFO) << "Finished serving " << addr.AsString()
+ << " after " << elapsed.count() << " sec";
env.CleanUp();
}
diff --git a/apps/cpp_rpc/rpc_tracker_client.h
b/apps/cpp_rpc/rpc_tracker_client.h
index 03a8dcf3a5..4b1e36b70d 100644
--- a/apps/cpp_rpc/rpc_tracker_client.h
+++ b/apps/cpp_rpc/rpc_tracker_client.h
@@ -198,8 +198,8 @@ class TrackerClient {
support::SockAddr addr(tracker_addr_);
support::TCPSocket sock;
sock.Create();
- LOG(INFO) << "Tracker connecting to " << addr.AsString();
if (sock.Connect(addr)) {
+ LOG(INFO) << "Connected to tracker " << addr.AsString();
return sock;
}