lidavidm commented on code in PR #1509:
URL: https://github.com/apache/arrow-adbc/pull/1509#discussion_r1476598196


##########
python/adbc_driver_manager/adbc_driver_manager/_blocking_impl.cc:
##########
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "_blocking_impl.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+#include <csignal>
+#include <cstring>
+#include <mutex>
+#include <thread>
+
+namespace pyadbc_driver_manager {
+
+// This is somewhat derived from io_util.cc in arrow, but that implementation
+// isn't easily used outside of Arrow's monolith.
+namespace {
+static std::once_flag kSpawnThread;
+static std::thread kCancelThread;
+
+static std::mutex cancel_mutex;
+static void (*cancel_callback)(void*) = nullptr;
+static void* cancel_callback_data = nullptr;
+static int pipe[2];
+struct sigaction old_sigint;
+struct sigaction our_sigint;
+
+std::string MakePipe() {
+  int rc = 0;
+#if defined(__linux__) && defined(__GLIBC__)
+  rc = pipe2(pipe, O_CLOEXEC | O_NONBLOCK);
+#else
+  return "Unsupported platform";
+#endif
+
+  if (rc != 0) {
+    return std::strerror(errno);
+  }
+  return "";
+}
+
+void InterruptThread() {
+  while (true) {
+    char buf = 0;
+    ssize_t bytes_read = read(pipe[0], &buf, 1);
+    if (bytes_read < 0) {
+      if (errno == EINTR) continue;
+      // XXX: we failed reading from the pipe; warn?

Review Comment:
   Could add a once_flag for warnings



##########
python/adbc_driver_manager/adbc_driver_manager/_blocking_impl.cc:
##########
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "_blocking_impl.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+#include <csignal>
+#include <cstring>
+#include <mutex>
+#include <thread>
+
+namespace pyadbc_driver_manager {
+
+// This is somewhat derived from io_util.cc in arrow, but that implementation
+// isn't easily used outside of Arrow's monolith.
+namespace {
+static std::once_flag kSpawnThread;
+static std::thread kCancelThread;
+
+static std::mutex cancel_mutex;
+static void (*cancel_callback)(void*) = nullptr;
+static void* cancel_callback_data = nullptr;
+static int pipe[2];
+struct sigaction old_sigint;
+struct sigaction our_sigint;
+
+std::string MakePipe() {
+  int rc = 0;
+#if defined(__linux__) && defined(__GLIBC__)
+  rc = pipe2(pipe, O_CLOEXEC | O_NONBLOCK);

Review Comment:
   Probably I want the write side to be nonblocking, but the read side to 
block...



##########
python/adbc_driver_manager/adbc_driver_manager/_lib.pyx:
##########
@@ -1481,3 +1483,47 @@ cdef class AdbcStatement(_AdbcHandle):
 cdef const CAdbcError* PyAdbcErrorFromArrayStream(
     CArrowArrayStream* stream, CAdbcStatusCode* status):
     return AdbcErrorFromArrayStream(stream, status)
+
+
+cdef extern from "_blocking_impl.h" nogil:
+    ctypedef void (*BlockingCallback)(void*) noexcept nogil
+    c_string 
CInitBlockingCallback"pyadbc_driver_manager::InitBlockingCallback"()
+    void 
CSetBlockingCallback"pyadbc_driver_manager::SetBlockingCallback"(BlockingCallback,
 void* data)
+    void CClearBlockingCallback"pyadbc_driver_manager::ClearBlockingCallback"()
+
+
[email protected]
+def _init_blocking_call():
+    error = bytes(CInitBlockingCallback()).decode("utf-8")
+    if error:
+        raise RuntimeError(error)
+
+
+def _blocking_call(func, args, kwargs, cancel):
+    """
+    Run functions that are expected to block with a native SIGINT handler.
+
+    Parameters
+    ----------
+    """
+    if threading.current_thread() is not threading.main_thread():
+        return func(*args, **kwargs)
+
+    _init_blocking_call()
+
+    # Set the callback for the background thread and save the signal handler
+    CSetBlockingCallback(&_handle_blocking_call, <void*>cancel)
+
+    try:
+        return func(*args, **kwargs)
+    finally:
+        # Restore the signal handler
+        CClearBlockingCallback()
+
+
+cdef void _handle_blocking_call(void* c_cancel) noexcept nogil:
+    # TODO: if this throws, we could save and restore the traceback later
+    # TODO: we could record that this was hit and raise a KeyboardInterrupt 
above

Review Comment:
   Not sure what Python users would prefer - currently it passes through the 
ADBC exception, not KeyboardInterrupt



##########
python/adbc_driver_manager/adbc_driver_manager/_lib.pyx:
##########
@@ -1481,3 +1483,47 @@ cdef class AdbcStatement(_AdbcHandle):
 cdef const CAdbcError* PyAdbcErrorFromArrayStream(
     CArrowArrayStream* stream, CAdbcStatusCode* status):
     return AdbcErrorFromArrayStream(stream, status)
+
+
+cdef extern from "_blocking_impl.h" nogil:
+    ctypedef void (*BlockingCallback)(void*) noexcept nogil
+    c_string 
CInitBlockingCallback"pyadbc_driver_manager::InitBlockingCallback"()
+    void 
CSetBlockingCallback"pyadbc_driver_manager::SetBlockingCallback"(BlockingCallback,
 void* data)
+    void CClearBlockingCallback"pyadbc_driver_manager::ClearBlockingCallback"()
+
+
[email protected]
+def _init_blocking_call():
+    error = bytes(CInitBlockingCallback()).decode("utf-8")
+    if error:
+        raise RuntimeError(error)
+
+
+def _blocking_call(func, args, kwargs, cancel):
+    """
+    Run functions that are expected to block with a native SIGINT handler.
+
+    Parameters
+    ----------
+    """
+    if threading.current_thread() is not threading.main_thread():
+        return func(*args, **kwargs)
+
+    _init_blocking_call()

Review Comment:
   Does the rest of the body want to be in a mutex? Or are we going to trust 
the current_thread check? 



##########
python/adbc_driver_manager/adbc_driver_manager/_lib.pyx:
##########
@@ -1481,3 +1483,47 @@ cdef class AdbcStatement(_AdbcHandle):
 cdef const CAdbcError* PyAdbcErrorFromArrayStream(
     CArrowArrayStream* stream, CAdbcStatusCode* status):
     return AdbcErrorFromArrayStream(stream, status)
+
+
+cdef extern from "_blocking_impl.h" nogil:
+    ctypedef void (*BlockingCallback)(void*) noexcept nogil
+    c_string 
CInitBlockingCallback"pyadbc_driver_manager::InitBlockingCallback"()
+    void 
CSetBlockingCallback"pyadbc_driver_manager::SetBlockingCallback"(BlockingCallback,
 void* data)
+    void CClearBlockingCallback"pyadbc_driver_manager::ClearBlockingCallback"()
+
+
[email protected]
+def _init_blocking_call():
+    error = bytes(CInitBlockingCallback()).decode("utf-8")
+    if error:
+        raise RuntimeError(error)

Review Comment:
   This should be a warning, not a hard error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to