This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b8ab8e80a GH-37364: [C++][GPU] Add CUDA impl of Device Event/Stream 
(#37365)
3b8ab8e80a is described below

commit 3b8ab8e80a3f37b975c9e8ab4aee8dc708b35d9d
Author: Matt Topol <[email protected]>
AuthorDate: Wed Aug 30 16:36:16 2023 -0400

    GH-37364: [C++][GPU] Add CUDA impl of Device Event/Stream (#37365)
    
    
    
    ### What changes are included in this PR?
    Adding `CudaDevice::SyncEvent` and `CudaDevice::Stream` implementations 
which provide more idiomatic handling of Events and Streams.
    
    ### Are these changes tested?
    Simple SyncEvent test added. More stream tests still being added.
    
    * Closes: #37364
    
    Authored-by: Matt Topol <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 cpp/src/arrow/c/bridge_test.cc    |   4 +-
 cpp/src/arrow/device.h            |  46 ++++++++++++++---
 cpp/src/arrow/gpu/cuda_context.cc | 106 +++++++++++++++++++++++++++++++++++---
 cpp/src/arrow/gpu/cuda_context.h  |  97 ++++++++++++++++++++++++++++++++++
 cpp/src/arrow/gpu/cuda_test.cc    |  59 +++++++++++++++++++++
 5 files changed, 299 insertions(+), 13 deletions(-)

diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc
index bd0e498a9f..2aedccc965 100644
--- a/cpp/src/arrow/c/bridge_test.cc
+++ b/cpp/src/arrow/c/bridge_test.cc
@@ -1222,7 +1222,9 @@ class MyDevice : public Device {
 
     virtual ~MySyncEvent() = default;
     Status Wait() override { return Status::OK(); }
-    Status Record(const Device::Stream&) override { return Status::OK(); }
+    Status Record(const Device::Stream&, const unsigned int) override {
+      return Status::OK();
+    }
   };
 
  protected:
diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h
index 55037ac418..066ca7e32a 100644
--- a/cpp/src/arrow/device.h
+++ b/cpp/src/arrow/device.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <string>
 
@@ -109,7 +110,11 @@ class ARROW_EXPORT Device : public 
std::enable_shared_from_this<Device>,
   /// should be trivially constructible from it's device-specific counterparts.
   class ARROW_EXPORT Stream {
    public:
-    virtual const void* get_raw() const { return NULLPTR; }
+    using release_fn_t = std::function<void(void*)>;
+
+    virtual ~Stream() = default;
+
+    virtual const void* get_raw() const { return stream_.get(); }
 
     /// \brief Make the stream wait on the provided event.
     ///
@@ -117,15 +122,42 @@ class ARROW_EXPORT Device : public 
std::enable_shared_from_this<Device>,
     /// event is completed without blocking the CPU.
     virtual Status WaitEvent(const SyncEvent&) = 0;
 
+    /// \brief Blocks the current thread until a stream's remaining tasks are 
completed
+    virtual Status Synchronize() const = 0;
+
    protected:
-    Stream() = default;
-    virtual ~Stream() = default;
+    explicit Stream(void* stream, release_fn_t release_stream)
+        : stream_{stream, release_stream} {}
+
+    std::unique_ptr<void, release_fn_t> stream_;
   };
 
+  virtual Result<std::shared_ptr<Stream>> MakeStream() { return NULLPTR; }
+
+  /// \brief Create a new device stream
+  ///
+  /// This should create the appropriate stream type for the device,
+  /// derived from Device::Stream to allow for stream ordered events
+  /// and memory allocations.
+  virtual Result<std::shared_ptr<Stream>> MakeStream(unsigned int flags) {
+    return NULLPTR;
+  }
+
+  /// @brief Wrap an existing device stream alongside a release function
+  ///
+  /// @param device_stream a pointer to the stream to wrap
+  /// @param release_fn a function to call during destruction, `nullptr` or
+  ///        a no-op function can be passed to indicate ownership is maintained
+  ///        externally
+  virtual Result<std::shared_ptr<Stream>> WrapStream(void* device_stream,
+                                                     Stream::release_fn_t 
release_fn) {
+    return NULLPTR;
+  }
+
   /// \brief EXPERIMENTAL: An object that provides event/stream sync primitives
   class ARROW_EXPORT SyncEvent {
    public:
-    using release_fn_t = void (*)(void*);
+    using release_fn_t = std::function<void(void*)>;
 
     virtual ~SyncEvent() = default;
 
@@ -134,9 +166,11 @@ class ARROW_EXPORT Device : public 
std::enable_shared_from_this<Device>,
     /// @brief Block until sync event is completed.
     virtual Status Wait() = 0;
 
+    inline Status Record(const Stream& st) { return Record(st, 0); }
+
     /// @brief Record the wrapped event on the stream so it triggers
     /// the event when the stream gets to that point in its queue.
-    virtual Status Record(const Stream&) = 0;
+    virtual Status Record(const Stream&, const unsigned int flags) = 0;
 
    protected:
     /// If creating this with a passed in event, the caller must ensure
@@ -225,7 +259,7 @@ class ARROW_EXPORT MemoryManager : public 
std::enable_shared_from_this<MemoryMan
 
   /// \brief Wrap an event into a SyncEvent.
   ///
-  /// @param sync_event passed in sync_event from the imported device array.
+  /// @param sync_event passed in sync_event (should be a pointer to the 
appropriate type)
   /// @param release_sync_event destructor to free sync_event. `nullptr` may be
   ///        passed to indicate that no destruction/freeing is necessary
   virtual Result<std::shared_ptr<Device::SyncEvent>> WrapDeviceSyncEvent(
diff --git a/cpp/src/arrow/gpu/cuda_context.cc 
b/cpp/src/arrow/gpu/cuda_context.cc
index 3e1af26cac..fb8935319e 100644
--- a/cpp/src/arrow/gpu/cuda_context.cc
+++ b/cpp/src/arrow/gpu/cuda_context.cc
@@ -26,11 +26,10 @@
 #include <utility>
 #include <vector>
 
-#include <cuda.h>
-
 #include "arrow/gpu/cuda_internal.h"
 #include "arrow/gpu/cuda_memory.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 
@@ -273,6 +272,35 @@ bool IsCudaDevice(const Device& device) {
   return device.type_name() == kCudaDeviceTypeName;
 }
 
+Result<std::shared_ptr<Device::Stream>> CudaDevice::MakeStream(unsigned int 
flags) {
+  ARROW_ASSIGN_OR_RAISE(auto context, GetContext());
+  ContextSaver 
set_temporary(reinterpret_cast<CUcontext>(context.get()->handle()));
+
+  CUstream stream;
+  CU_RETURN_NOT_OK("cuStreamCreate", cuStreamCreate(&stream, flags));
+  return std::shared_ptr<Device::Stream>(
+      new CudaDevice::Stream(context, new CUstream(stream), [](void* st) {
+        auto typed_stream = reinterpret_cast<CUstream*>(st);
+        // DCHECK_OK still evaluates its argument in release mode
+        // but in debug mode it'll also throw if it fails
+        DCHECK_OK(
+            internal::StatusFromCuda(cuStreamDestroy(*typed_stream), 
"cuStreamDestroy"));
+        delete typed_stream;
+      }));
+}
+
+Result<std::shared_ptr<Device::Stream>> CudaDevice::WrapStream(
+    void* stream, Device::Stream::release_fn_t release_fn) {
+  if (!release_fn) {
+    release_fn = [](void*) {};
+  }
+
+  auto cu_stream = reinterpret_cast<CUstream*>(stream);
+  ARROW_ASSIGN_OR_RAISE(auto context, GetContext());
+  return std::shared_ptr<Device::Stream>(
+      new CudaDevice::Stream(context, cu_stream, release_fn));
+}
+
 Result<std::shared_ptr<CudaDevice>> AsCudaDevice(const 
std::shared_ptr<Device>& device) {
   if (IsCudaDevice(*device)) {
     return checked_pointer_cast<CudaDevice>(device);
@@ -281,6 +309,48 @@ Result<std::shared_ptr<CudaDevice>> AsCudaDevice(const 
std::shared_ptr<Device>&
   }
 }
 
+Status CudaDevice::Stream::WaitEvent(const Device::SyncEvent& event) {
+  auto cuda_event =
+      checked_cast<const CudaDevice::SyncEvent*, const 
Device::SyncEvent*>(&event);
+  if (!cuda_event) {
+    return Status::Invalid("CudaDevice::Stream cannot Wait on non-cuda event");
+  }
+
+  auto cu_event = cuda_event->value();
+  if (!cu_event) {
+    return Status::Invalid("Cuda Stream cannot wait on null event");
+  }
+
+  ContextSaver 
set_temporary(reinterpret_cast<CUcontext>(context_.get()->handle()));
+  CU_RETURN_NOT_OK("cuStreamWaitEvent",
+                   cuStreamWaitEvent(value(), cu_event, 
CU_EVENT_WAIT_DEFAULT));
+  return Status::OK();
+}
+
+Status CudaDevice::Stream::Synchronize() const {
+  ContextSaver 
set_temporary(reinterpret_cast<CUcontext>(context_.get()->handle()));
+  CU_RETURN_NOT_OK("cuStreamSynchronize", cuStreamSynchronize(value()));
+  return Status::OK();
+}
+
+Status CudaDevice::SyncEvent::Wait() {
+  ContextSaver 
set_temporary(reinterpret_cast<CUcontext>(context_.get()->handle()));
+  CU_RETURN_NOT_OK("cuEventSynchronize", cuEventSynchronize(value()));
+  return Status::OK();
+}
+
+Status CudaDevice::SyncEvent::Record(const Device::Stream& st, const unsigned 
int flags) {
+  auto cuda_stream = checked_cast<const CudaDevice::Stream*, const 
Device::Stream*>(&st);
+  if (!cuda_stream) {
+    return Status::Invalid("CudaDevice::Event cannot record on non-cuda 
stream");
+  }
+
+  ContextSaver 
set_temporary(reinterpret_cast<CUcontext>(context_.get()->handle()));
+  CU_RETURN_NOT_OK("cuEventRecordWithFlags",
+                   cuEventRecordWithFlags(value(), cuda_stream->value(), 
flags));
+  return Status::OK();
+}
+
 // ----------------------------------------------------------------------
 // CudaMemoryManager implementation
 
@@ -293,11 +363,35 @@ std::shared_ptr<CudaDevice> 
CudaMemoryManager::cuda_device() const {
   return checked_pointer_cast<CudaDevice>(device_);
 }
 
+Result<std::shared_ptr<Device::SyncEvent>> 
CudaMemoryManager::MakeDeviceSyncEvent() {
+  ARROW_ASSIGN_OR_RAISE(auto context, cuda_device()->GetContext());
+  ContextSaver 
set_temporary(reinterpret_cast<CUcontext>(context.get()->handle()));
+
+  // TODO: event creation flags
+  CUevent ev;
+  CU_RETURN_NOT_OK("cuEventCreate", cuEventCreate(&ev, CU_EVENT_DEFAULT));
+
+  return std::shared_ptr<Device::SyncEvent>(
+      new CudaDevice::SyncEvent(context, new CUevent(ev), [](void* ev) {
+        auto typed_event = reinterpret_cast<CUevent*>(ev);
+        // DCHECK_OK still evaluates its argument in release mode
+        // but in debug mode it'll also throw if it fails
+        DCHECK_OK(
+            internal::StatusFromCuda(cuEventDestroy(*typed_event), 
"cuEventDestroy"));
+        delete typed_event;
+      }));
+}
+
 Result<std::shared_ptr<Device::SyncEvent>> 
CudaMemoryManager::WrapDeviceSyncEvent(
     void* sync_event, Device::SyncEvent::release_fn_t release_sync_event) {
-  return nullptr;
-  // auto ev = reinterpret_cast<CUstream*>(sync_event);
-  // return std::make_shared<CudaDeviceSync>(ev);
+  if (!release_sync_event) {
+    release_sync_event = [](void*) {};
+  }
+
+  auto ev = reinterpret_cast<CUevent*>(sync_event);
+  ARROW_ASSIGN_OR_RAISE(auto context, cuda_device()->GetContext());
+  return std::shared_ptr<Device::SyncEvent>(
+      new CudaDevice::SyncEvent(context, ev, release_sync_event));
 }
 
 Result<std::shared_ptr<io::RandomAccessFile>> 
CudaMemoryManager::GetBufferReader(
@@ -440,7 +534,7 @@ class CudaDeviceManager::Impl {
   Status AllocateHost(int device_number, int64_t nbytes, uint8_t** out) {
     RETURN_NOT_OK(CheckDeviceNum(device_number));
     ARROW_ASSIGN_OR_RAISE(auto ctx, GetContext(device_number));
-    ContextSaver set_temporary((CUcontext)(ctx.get()->handle()));
+    ContextSaver 
set_temporary(reinterpret_cast<CUcontext>(ctx.get()->handle()));
     CU_RETURN_NOT_OK("cuMemHostAlloc", 
cuMemHostAlloc(reinterpret_cast<void**>(out),
                                                       
static_cast<size_t>(nbytes),
                                                       
CU_MEMHOSTALLOC_PORTABLE));
diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h
index 79a2ec9f97..e4d8482855 100644
--- a/cpp/src/arrow/gpu/cuda_context.h
+++ b/cpp/src/arrow/gpu/cuda_context.h
@@ -21,6 +21,8 @@
 #include <memory>
 #include <string>
 
+#include <cuda.h>
+
 #include "arrow/device.h"
 #include "arrow/result.h"
 #include "arrow/util/visibility.h"
@@ -140,6 +142,90 @@ class ARROW_EXPORT CudaDevice : public Device {
   /// \param[in] size The buffer size in bytes
   Result<std::shared_ptr<CudaHostBuffer>> AllocateHostBuffer(int64_t size);
 
+  /// \brief EXPERIMENTAL: Wrapper for CUstreams
+  ///
+  /// Does not *own* the CUstream object which must be separately constructed
+  /// and freed using cuStreamCreate and cuStreamDestroy (or equivalent).
+  /// Default construction will use the cuda default stream, and does not allow
+  /// construction from literal 0 or nullptr.
+  class ARROW_EXPORT Stream : public Device::Stream {
+   public:
+    ~Stream() = default;
+
+    [[nodiscard]] inline CUstream value() const noexcept {
+      if (!stream_) {
+        return CUstream{};
+      }
+      return *reinterpret_cast<CUstream*>(stream_.get());
+    }
+    operator CUstream() const noexcept { return value(); }
+
+    const void* get_raw() const noexcept override { return stream_.get(); }
+    Status WaitEvent(const Device::SyncEvent&) override;
+    Status Synchronize() const override;
+
+   protected:
+    friend class CudaDevice;
+
+    explicit Stream(std::shared_ptr<CudaContext> ctx, CUstream* st,
+                    Device::Stream::release_fn_t release_fn)
+        : Device::Stream(reinterpret_cast<void*>(st), release_fn),
+          context_{std::move(ctx)} {}
+
+    // disable construction from literal 0
+    explicit Stream(std::shared_ptr<CudaContext>, int,
+                    Device::Stream::release_fn_t) = delete;  // Prevent cast 
from 0
+    explicit Stream(std::shared_ptr<CudaContext>, std::nullptr_t,
+                    Device::Stream::release_fn_t) = delete;  // Prevent cast 
from nullptr
+
+   private:
+    std::shared_ptr<CudaContext> context_;
+  };
+
+  Result<std::shared_ptr<Device::Stream>> MakeStream() override { return 
MakeStream(0); }
+
+  /// \brief Create a CUstream wrapper in the current context
+  Result<std::shared_ptr<Device::Stream>> MakeStream(unsigned int flags) 
override;
+
+  /// @brief Wrap a pointer to an existing stream
+  ///
+  /// @param device_stream passed in stream (should be a CUstream*)
+  /// @param release_fn destructor to free the stream. `nullptr` may be passed
+  ///        to indicate there is no destruction/freeing necessary.
+  Result<std::shared_ptr<Device::Stream>> WrapStream(
+      void* device_stream, Stream::release_fn_t release_fn) override;
+
+  class ARROW_EXPORT SyncEvent : public Device::SyncEvent {
+   public:
+    [[nodiscard]] CUevent value() const {
+      if (sync_event_) {
+        return *static_cast<CUevent*>(sync_event_.get());
+      }
+      return CUevent{};
+    }
+    operator CUevent() const noexcept { return value(); }
+
+    /// @brief Block until the sync event is marked completed
+    Status Wait() override;
+
+    /// @brief Record the wrapped event on the stream
+    ///
+    /// Once the stream completes the tasks previously added to it,
+    /// it will trigger the event.
+    Status Record(const Device::Stream&, const unsigned int) override;
+
+   protected:
+    friend class CudaMemoryManager;
+
+    explicit SyncEvent(std::shared_ptr<CudaContext> ctx, CUevent* ev,
+                       Device::SyncEvent::release_fn_t release_ev)
+        : Device::SyncEvent(reinterpret_cast<void*>(ev), release_ev),
+          context_{std::move(ctx)} {}
+
+   private:
+    std::shared_ptr<CudaContext> context_;
+  };
+
  protected:
   struct Impl;
 
@@ -179,6 +265,17 @@ class ARROW_EXPORT CudaMemoryManager : public 
MemoryManager {
   /// having to cast the `device()` result.
   std::shared_ptr<CudaDevice> cuda_device() const;
 
+  /// \brief Creates a wrapped CUevent.
+  ///
+  /// Will call cuEventCreate and it will call cuEventDestroy internally
+  /// when the event is destructed.
+  Result<std::shared_ptr<Device::SyncEvent>> MakeDeviceSyncEvent() override;
+
+  /// \brief Wraps an existing event into a sync event.
+  ///
+  /// @param sync_event the event to wrap, must be a CUevent*
+  /// @param release_sync_event a function to call during destruction, 
`nullptr` or
+  ///        a no-op function can be passed to indicate ownership is 
maintained externally
   Result<std::shared_ptr<Device::SyncEvent>> WrapDeviceSyncEvent(
       void* sync_event, Device::SyncEvent::release_fn_t release_sync_event) 
override;
 
diff --git a/cpp/src/arrow/gpu/cuda_test.cc b/cpp/src/arrow/gpu/cuda_test.cc
index 6d392213e2..c39dbe28e8 100644
--- a/cpp/src/arrow/gpu/cuda_test.cc
+++ b/cpp/src/arrow/gpu/cuda_test.cc
@@ -39,9 +39,11 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::checked_pointer_cast;
 
 namespace cuda {
 
+using internal::ContextSaver;
 using internal::StatusFromCuda;
 
 #define ASSERT_CUDA_OK(expr) 
ASSERT_OK(::arrow::cuda::internal::StatusFromCuda((expr)))
@@ -213,6 +215,63 @@ TEST_F(TestCudaDevice, Copy) {
   }
 }
 
+TEST_F(TestCudaDevice, CreateSyncEvent) {
+  ASSERT_OK_AND_ASSIGN(auto ev, mm_->MakeDeviceSyncEvent());
+  ASSERT_TRUE(ev);
+  auto cuda_ev = checked_pointer_cast<CudaDevice::SyncEvent>(ev);
+  ASSERT_CUDA_OK(cuEventQuery(*cuda_ev));
+}
+
+TEST_F(TestCudaDevice, WrapDeviceSyncEvent) {
+  // need a context to call cuEventCreate
+  ContextSaver 
set_temporary(reinterpret_cast<CUcontext>(context_.get()->handle()));
+
+  CUevent event;
+  ASSERT_CUDA_OK(cuEventCreate(&event, CU_EVENT_DEFAULT));
+  ASSERT_CUDA_OK(cuEventQuery(event));
+
+  {
+    // wrap event with no-op destructor
+    ASSERT_OK_AND_ASSIGN(auto ev, mm_->WrapDeviceSyncEvent(&event, [](void*) 
{}));
+    ASSERT_TRUE(ev);
+    // verify it's the same event we passed in
+    ASSERT_EQ(ev->get_raw(), &event);
+    auto cuda_ev = checked_pointer_cast<CudaDevice::SyncEvent>(ev);
+    ASSERT_CUDA_OK(cuEventQuery(*cuda_ev));
+  }
+
+  // verify that the event is still valid on the device when the shared_ptr
+  // goes away since we didn't give it ownership.
+  ASSERT_CUDA_OK(cuEventQuery(event));
+  ASSERT_CUDA_OK(cuEventDestroy(event));
+}
+
+TEST_F(TestCudaDevice, DefaultStream) {
+  ASSERT_OK_AND_ASSIGN(auto stream, device_->MakeStream());
+  ASSERT_OK_AND_ASSIGN(auto ev, mm_->MakeDeviceSyncEvent());
+
+  ASSERT_OK(ev->Record(*stream));
+  ASSERT_OK(stream->WaitEvent(*ev));
+  ASSERT_OK(ev->Wait());
+  ASSERT_OK(stream->Synchronize());
+}
+
+TEST_F(TestCudaDevice, ExplicitStream) {
+  // need a context to call cuEventCreate
+  ContextSaver 
set_temporary(reinterpret_cast<CUcontext>(context_.get()->handle()));
+
+  CUstream cu_stream = CU_STREAM_PER_THREAD;
+  {
+    ASSERT_OK_AND_ASSIGN(auto stream, device_->WrapStream(&cu_stream, 
nullptr));
+    ASSERT_OK_AND_ASSIGN(auto ev, mm_->MakeDeviceSyncEvent());
+
+    ASSERT_OK(ev->Record(*stream));
+    ASSERT_OK(stream->WaitEvent(*ev));
+    ASSERT_OK(ev->Wait());
+    ASSERT_OK(stream->Synchronize());
+  }
+}
+
 // ------------------------------------------------------------------------
 // Test CudaContext
 

Reply via email to