kkraus14 commented on code in PR #37365:
URL: https://github.com/apache/arrow/pull/37365#discussion_r1304958220
##########
cpp/src/arrow/gpu/cuda_context.cc:
##########
@@ -281,6 +279,49 @@ Result<std::shared_ptr<CudaDevice>> AsCudaDevice(const
std::shared_ptr<Device>&
}
}
+Status CudaDevice::Stream::WaitEvent(const Device::SyncEvent& event) {
+ auto cuda_event =
+ checked_cast<const CudaDevice::SyncEvent*, const
Device::SyncEvent*>(&event);
+ if (!cuda_event) {
+ return Status::Invalid("CudaDevice::Stream cannot Wait on non-cuda event");
+ }
+
+ auto cu_event = cuda_event->value();
+ if (!cu_event) {
+ return Status::Invalid("Cuda Stream cannot wait on null event");
+ }
+
+ ContextSaver set_temporary((CUcontext)(context_.get()->handle()));
+ // TODO: do we need to account for CUevent_capture_flags??
Review Comment:
My understanding is the only usage of flags right now is under the context
of CUDA Graphs
(https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__GRAPH.html#group__CUDA__GRAPH).
If they need to handle this case they can always get the raw `Stream` /
`Event` handles from Arrow for now and do it themselves. If we need to address
it later we can.
##########
cpp/src/arrow/gpu/cuda_test.cc:
##########
@@ -213,6 +215,67 @@ TEST_F(TestCudaDevice, Copy) {
}
}
+TEST_F(TestCudaDevice, CreateSyncEvent) {
Review Comment:
Doesn't have to be this test but it would be good to destroy the sync event
and then check the cuda error code to have an explicit test that the destructor
works as expected as well
##########
cpp/src/arrow/gpu/cuda_test.cc:
##########
@@ -213,6 +215,67 @@ TEST_F(TestCudaDevice, Copy) {
}
}
+TEST_F(TestCudaDevice, CreateSyncEvent) {
+ ASSERT_OK_AND_ASSIGN(auto ev, mm_->MakeDeviceSyncEvent());
+ ASSERT_TRUE(ev);
+ auto cuda_ev = checked_pointer_cast<CudaDevice::SyncEvent>(ev);
+ ASSERT_EQ(CUDA_SUCCESS, cuEventQuery(*cuda_ev));
+}
+
+TEST_F(TestCudaDevice, WrapDeviceSyncEvent) {
+ // need a context to call cuEventCreate
+ ContextSaver set_temporary((CUcontext)(context_.get()->handle()));
+
+ CUevent event;
+ ASSERT_CUDA_OK(cuEventCreate(&event, CU_EVENT_DEFAULT));
+ ASSERT_CUDA_OK(cuEventQuery(event));
+
+ {
+ // wrap event with no-op destructor
+ ASSERT_OK_AND_ASSIGN(auto ev, mm_->WrapDeviceSyncEvent(&event, [](void*)
{}));
+ ASSERT_TRUE(ev);
+ // verify it's the same event we passed in
+ ASSERT_EQ(ev->get_raw(), &event);
+ auto cuda_ev = checked_pointer_cast<CudaDevice::SyncEvent>(ev);
+ ASSERT_CUDA_OK(cuEventQuery(*cuda_ev));
+ }
+
+ // verify that the event is still valid on the device when the shared_ptr
+ // goes away since we didn't give it ownership.
+ ASSERT_CUDA_OK(cuEventQuery(event));
+ ASSERT_CUDA_OK(cuEventDestroy(event));
+}
+
+TEST_F(TestCudaDevice, DefaultStream) {
Review Comment:
+1 in destroying the stream here too
##########
cpp/src/arrow/gpu/cuda_context.cc:
##########
@@ -281,6 +279,49 @@ Result<std::shared_ptr<CudaDevice>> AsCudaDevice(const
std::shared_ptr<Device>&
}
}
+Status CudaDevice::Stream::WaitEvent(const Device::SyncEvent& event) {
+ auto cuda_event =
+ checked_cast<const CudaDevice::SyncEvent*, const
Device::SyncEvent*>(&event);
+ if (!cuda_event) {
+ return Status::Invalid("CudaDevice::Stream cannot Wait on non-cuda event");
+ }
+
+ auto cu_event = cuda_event->value();
+ if (!cu_event) {
+ return Status::Invalid("Cuda Stream cannot wait on null event");
+ }
+
+ ContextSaver set_temporary((CUcontext)(context_.get()->handle()));
+ // TODO: do we need to account for CUevent_capture_flags??
+ CU_RETURN_NOT_OK("cuStreamWaitEvent",
+ cuStreamWaitEvent(stream_, cu_event,
CU_EVENT_WAIT_DEFAULT));
+ return Status::OK();
+}
+
+Status CudaDevice::Stream::Synchronize() const {
+ ContextSaver set_temporary((CUcontext)(context_.get()->handle()));
+ CU_RETURN_NOT_OK("cuStreamSynchronize", cuStreamSynchronize(stream_));
+ return Status::OK();
+}
+
+Status CudaDevice::SyncEvent::Wait() {
+ ContextSaver set_temporary((CUcontext)(context_.get()->handle()));
+ CU_RETURN_NOT_OK("cuEventSynchronize", cuEventSynchronize(value()));
+ return Status::OK();
+}
+
+Status CudaDevice::SyncEvent::Record(const Device::Stream& st) {
+ auto cuda_stream = checked_cast<const CudaDevice::Stream*, const
Device::Stream*>(&st);
+ if (!cuda_stream) {
+ return Status::Invalid("CudaDevice::Event cannot record on non-cuda
stream");
+ }
+
+ ContextSaver set_temporary((CUcontext)(context_.get()->handle()));
+ CU_RETURN_NOT_OK("cuEventRecord", cuEventRecord(value(),
cuda_stream->value()));
+ // TODO: there is also cuEventRecordWithFlags, do we want to allow flags?
Review Comment:
+1 here
##########
cpp/src/arrow/gpu/cuda_context.cc:
##########
@@ -293,11 +334,32 @@ std::shared_ptr<CudaDevice>
CudaMemoryManager::cuda_device() const {
return checked_pointer_cast<CudaDevice>(device_);
}
+Result<std::shared_ptr<Device::SyncEvent>>
CudaMemoryManager::MakeDeviceSyncEvent() {
+ ARROW_ASSIGN_OR_RAISE(auto context, cuda_device()->GetContext());
+ ContextSaver set_temporary((CUcontext)(context.get()->handle()));
+
+ // TODO: event creation flags??
Review Comment:
This one matters a bit more since the CUDA event creation flags can be used
to control behavior as well as can have non-negligible performance differences
(disabling timing can yield perf).
That being said, I'd say we should again punt until there's proof of its
need as someone can always use the underlying types directly as needed.
##########
cpp/src/arrow/gpu/cuda_test.cc:
##########
@@ -213,6 +215,67 @@ TEST_F(TestCudaDevice, Copy) {
}
}
+TEST_F(TestCudaDevice, CreateSyncEvent) {
+ ASSERT_OK_AND_ASSIGN(auto ev, mm_->MakeDeviceSyncEvent());
+ ASSERT_TRUE(ev);
+ auto cuda_ev = checked_pointer_cast<CudaDevice::SyncEvent>(ev);
+ ASSERT_EQ(CUDA_SUCCESS, cuEventQuery(*cuda_ev));
+}
+
+TEST_F(TestCudaDevice, WrapDeviceSyncEvent) {
+ // need a context to call cuEventCreate
+ ContextSaver set_temporary((CUcontext)(context_.get()->handle()));
+
+ CUevent event;
+ ASSERT_CUDA_OK(cuEventCreate(&event, CU_EVENT_DEFAULT));
+ ASSERT_CUDA_OK(cuEventQuery(event));
+
+ {
+ // wrap event with no-op destructor
+ ASSERT_OK_AND_ASSIGN(auto ev, mm_->WrapDeviceSyncEvent(&event, [](void*)
{}));
+ ASSERT_TRUE(ev);
+ // verify it's the same event we passed in
+ ASSERT_EQ(ev->get_raw(), &event);
+ auto cuda_ev = checked_pointer_cast<CudaDevice::SyncEvent>(ev);
+ ASSERT_CUDA_OK(cuEventQuery(*cuda_ev));
+ }
+
+ // verify that the event is still valid on the device when the shared_ptr
+ // goes away since we didn't give it ownership.
+ ASSERT_CUDA_OK(cuEventQuery(event));
+ ASSERT_CUDA_OK(cuEventDestroy(event));
+}
+
+TEST_F(TestCudaDevice, DefaultStream) {
+ CudaDevice::Stream stream{context_};
+ ASSERT_OK_AND_ASSIGN(auto ev, mm_->MakeDeviceSyncEvent());
+
+ ASSERT_OK(ev->Record(stream));
+ ASSERT_OK(stream.WaitEvent(*ev));
+ ASSERT_OK(ev->Wait());
+ ASSERT_OK(stream.Synchronize());
+}
+
+TEST_F(TestCudaDevice, ExplicitStream) {
+ // need a context to call cuEventCreate
+ ContextSaver set_temporary((CUcontext)(context_.get()->handle()));
+
+ CUstream cu_stream;
+ ASSERT_CUDA_OK(cuStreamCreate(&cu_stream, CU_STREAM_NON_BLOCKING));
Review Comment:
maybe a bit of a nitpick, but can we use `CU_STREAM_PER_THREAD` here instead
of creating the stream? It would be a slight bit faster in saving us a stream
creation and would double to make sure we don't somehow break being able to
wrap those special defined streams.
##########
cpp/src/arrow/gpu/cuda_context.cc:
##########
@@ -293,11 +334,32 @@ std::shared_ptr<CudaDevice>
CudaMemoryManager::cuda_device() const {
return checked_pointer_cast<CudaDevice>(device_);
}
+Result<std::shared_ptr<Device::SyncEvent>>
CudaMemoryManager::MakeDeviceSyncEvent() {
+ ARROW_ASSIGN_OR_RAISE(auto context, cuda_device()->GetContext());
+ ContextSaver set_temporary((CUcontext)(context.get()->handle()));
+
+ // TODO: event creation flags??
+ CUevent ev;
+ CU_RETURN_NOT_OK("cuEventCreate", cuEventCreate(&ev, CU_EVENT_DEFAULT));
+
+ return std::shared_ptr<Device::SyncEvent>(
+ new CudaDevice::SyncEvent(context, new CUevent(ev), [](void* ev) {
+ auto typed_event = reinterpret_cast<CUevent*>(ev);
+ auto result = cuEventDestroy(*typed_event);
+ if (result != CUDA_SUCCESS) {
+ // should we throw? I think that would automatically terminate
+ // if you throw in a destructor. What should we do with this error?
Review Comment:
Some prior art here from RMM for CUDA Streams:
https://github.com/rapidsai/rmm/blob/228330543cba4921e6b71a52dd1fa81c63921420/include/rmm/cuda_stream.hpp#L67
They use an assert macro
(https://github.com/rapidsai/rmm/blob/228330543cba4921e6b71a52dd1fa81c63921420/include/rmm/detail/error.hpp#L204-L245)
which is effectively a no-op at runtime.
This error would stick around until the next time someone tries to check the
result of a CUDA call and would pop up then, so I think we should be okay to
either add a debug assertion or just do nothing here.
##########
cpp/src/arrow/gpu/cuda_context.h:
##########
@@ -140,6 +142,71 @@ class ARROW_EXPORT CudaDevice : public Device {
/// \param[in] size The buffer size in bytes
Result<std::shared_ptr<CudaHostBuffer>> AllocateHostBuffer(int64_t size);
+ /// \brief EXPERIMENTAL: Wrapper for CUstreams
+ ///
+ /// Does not *own* the CUstream object which must be separately constructed
+ /// and freed using cuStreamCreate and cuStreamDestroy (or equivalent).
+ /// Default construction will use the cuda default stream, and does not allow
+ /// construction from literal 0 or nullptr.
+ class ARROW_EXPORT Stream : public Device::Stream {
+ public:
+ explicit Stream(std::shared_ptr<CudaContext> ctx) noexcept
+ : context_{std::move(ctx)}, stream_{} {}
+
+ ~Stream() = default;
+ explicit Stream(std::shared_ptr<CudaContext> ctx, CUstream stream) noexcept
+ : context_{std::move(ctx)}, stream_{stream} {}
Review Comment:
It would be nice to have a constructor that just works on the default CUDA
context similar to how `MakeDeviceSyncEvent` and `WrapDeviceSyncEvent` work.
Should we follow the same pattern for Streams here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]