paleolimbot commented on code in PR #205: URL: https://github.com/apache/arrow-nanoarrow/pull/205#discussion_r1226839319
########## extensions/nanoarrow_device/src/nanoarrow/nanoarrow_device_cuda.c: ########## @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cuda_runtime_api.h> + +#include "nanoarrow_device.h" + +static void ArrowDeviceCudaAllocatorFree(struct ArrowBufferAllocator* allocator, + uint8_t* ptr, int64_t old_size) { + if (ptr != NULL) { + cudaFree(ptr); + } +} + +static uint8_t* ArrowDeviceCudaAllocatorReallocate(struct ArrowBufferAllocator* allocator, + uint8_t* ptr, int64_t old_size, + int64_t new_size) { + ArrowDeviceCudaAllocatorFree(allocator, ptr, old_size); + return NULL; +} + +static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowBuffer* buffer, + int64_t size_bytes) { + void* ptr = NULL; + cudaError_t result = cudaMalloc(&ptr, (int64_t)size_bytes); + if (result != cudaSuccess) { + return EINVAL; + } + + buffer->data = (uint8_t*)ptr; + buffer->size_bytes = size_bytes; + buffer->capacity_bytes = size_bytes; + buffer->allocator.reallocate = &ArrowDeviceCudaAllocatorReallocate; + buffer->allocator.free = &ArrowDeviceCudaAllocatorFree; + // TODO: We almost certainly need device_id here + buffer->allocator.private_data = NULL; + return NANOARROW_OK; +} + +static void ArrowDeviceCudaHostAllocatorFree(struct ArrowBufferAllocator* allocator, + uint8_t* ptr, int64_t old_size) { + if (ptr != NULL) { + cudaFreeHost(ptr); + } +} + +static uint8_t* ArrowDeviceCudaHostAllocatorReallocate( + struct ArrowBufferAllocator* allocator, uint8_t* ptr, int64_t old_size, + int64_t new_size) { + ArrowDeviceCudaHostAllocatorFree(allocator, ptr, old_size); + return NULL; +} + +static ArrowErrorCode ArrowDeviceCudaHostAllocateBuffer(struct ArrowBuffer* buffer, + int64_t size_bytes) { + void* ptr = NULL; + cudaError_t result = cudaMallocHost(&ptr, (int64_t)size_bytes); + if (result != cudaSuccess) { + return EINVAL; + } + + buffer->data = (uint8_t*)ptr; + buffer->size_bytes = size_bytes; + buffer->capacity_bytes = size_bytes; + buffer->allocator.reallocate = &ArrowDeviceCudaHostAllocatorReallocate; + buffer->allocator.free = &ArrowDeviceCudaHostAllocatorFree; + // TODO: We almost certainly need device_id here + buffer->allocator.private_data = NULL; + return NANOARROW_OK; +} + +// TODO: All these buffer copiers would benefit from cudaMemcpyAsync but there is +// no good way to incorporate that just yet + +static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src, + struct ArrowDeviceBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBuffer* dst) { + if (device_src->device_type == ARROW_DEVICE_CPU && + device_dst->device_type == ARROW_DEVICE_CUDA) { + struct ArrowBuffer tmp; + NANOARROW_RETURN_NOT_OK(ArrowDeviceCudaAllocateBuffer(&tmp, src.size_bytes)); + cudaError_t result = + cudaMemcpy(tmp.data, ((uint8_t*)src.private_data) + src.offset_bytes, + (size_t)src.size_bytes, cudaMemcpyHostToDevice); + if (result != cudaSuccess) { + ArrowBufferReset(&tmp); + return EINVAL; + } + + ArrowBufferMove(&tmp, dst); + return NANOARROW_OK; + + } else if (device_src->device_type == ARROW_DEVICE_CUDA && + device_dst->device_type == ARROW_DEVICE_CUDA) { + struct ArrowBuffer tmp; + NANOARROW_RETURN_NOT_OK(ArrowDeviceCudaAllocateBuffer(&tmp, src.size_bytes)); + cudaError_t result = + cudaMemcpy(tmp.data, ((uint8_t*)src.private_data) + src.offset_bytes, + (size_t)src.size_bytes, cudaMemcpyDeviceToDevice); + if (result != cudaSuccess) { + ArrowBufferReset(&tmp); + return EINVAL; + } + + ArrowBufferMove(&tmp, dst); + return NANOARROW_OK; + + } else if (device_src->device_type == ARROW_DEVICE_CUDA && + device_dst->device_type == ARROW_DEVICE_CPU) { + struct ArrowBuffer tmp; + ArrowBufferInit(&tmp); + NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&tmp, src.size_bytes)); + tmp.size_bytes = src.size_bytes; + cudaError_t result = + cudaMemcpy(tmp.data, ((uint8_t*)src.private_data) + src.offset_bytes, + (size_t)src.size_bytes, cudaMemcpyDeviceToHost); + if (result != cudaSuccess) { + ArrowBufferReset(&tmp); + return EINVAL; + } + + ArrowBufferMove(&tmp, dst); + return NANOARROW_OK; + + } else if (device_src->device_type == ARROW_DEVICE_CPU && + device_dst->device_type == ARROW_DEVICE_CUDA_HOST) { + NANOARROW_RETURN_NOT_OK(ArrowDeviceCudaHostAllocateBuffer(dst, src.size_bytes)); + memcpy(dst->data, ((uint8_t*)src.private_data) + src.offset_bytes, + (size_t)src.size_bytes); + return NANOARROW_OK; + + } else if (device_src->device_type == ARROW_DEVICE_CUDA_HOST && + device_dst->device_type == ARROW_DEVICE_CUDA_HOST) { + NANOARROW_RETURN_NOT_OK(ArrowDeviceCudaHostAllocateBuffer(dst, src.size_bytes)); + memcpy(dst->data, ((uint8_t*)src.private_data) + src.offset_bytes, + (size_t)src.size_bytes); + return NANOARROW_OK; + + } else if (device_src->device_type == ARROW_DEVICE_CUDA_HOST && + device_dst->device_type == ARROW_DEVICE_CPU) { + struct ArrowBuffer tmp; + ArrowBufferInit(&tmp); + NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&tmp, src.size_bytes)); + tmp.size_bytes = src.size_bytes; + memcpy(tmp.data, ((uint8_t*)src.private_data) + src.offset_bytes, + (size_t)src.size_bytes); + ArrowBufferMove(&tmp, dst); + return NANOARROW_OK; + + } else { + return ENOTSUP; + } +} + +static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct ArrowDevice* device_src, + struct ArrowDeviceBufferView src, + struct ArrowDevice* device_dst, + struct ArrowDeviceBufferView dst) { + // This is all just cudaMemcpy or memcpy + if (device_src->device_type == ARROW_DEVICE_CPU && + device_dst->device_type == ARROW_DEVICE_CUDA) { + cudaError_t result = cudaMemcpy(((uint8_t*)dst.private_data) + dst.offset_bytes, + ((uint8_t*)src.private_data) + src.offset_bytes, + dst.size_bytes, cudaMemcpyHostToDevice); + if (result != cudaSuccess) { + return EINVAL; + } + return NANOARROW_OK; + + } else if (device_src->device_type == ARROW_DEVICE_CUDA && + device_dst->device_type == ARROW_DEVICE_CUDA) { + cudaError_t result = cudaMemcpy(((uint8_t*)dst.private_data) + dst.offset_bytes, + ((uint8_t*)src.private_data) + src.offset_bytes, + dst.size_bytes, cudaMemcpyDeviceToDevice); + if (result != cudaSuccess) { + return EINVAL; + } + return NANOARROW_OK; + + } else if (device_src->device_type == ARROW_DEVICE_CUDA && + device_dst->device_type == ARROW_DEVICE_CPU) { + cudaError_t result = cudaMemcpy(((uint8_t*)dst.private_data) + dst.offset_bytes, + ((uint8_t*)src.private_data) + src.offset_bytes, + dst.size_bytes, cudaMemcpyDeviceToHost); + if (result != cudaSuccess) { + return EINVAL; + } + return NANOARROW_OK; + + } else if (device_src->device_type == ARROW_DEVICE_CPU && + device_dst->device_type == ARROW_DEVICE_CUDA_HOST) { + memcpy(((uint8_t*)dst.private_data) + dst.offset_bytes, + ((uint8_t*)src.private_data) + src.offset_bytes, dst.size_bytes); + return NANOARROW_OK; + } else if (device_src->device_type == ARROW_DEVICE_CUDA_HOST && + device_dst->device_type == ARROW_DEVICE_CUDA_HOST) { + memcpy(((uint8_t*)dst.private_data) + dst.offset_bytes, + ((uint8_t*)src.private_data) + src.offset_bytes, dst.size_bytes); + return NANOARROW_OK; + } else if (device_src->device_type == ARROW_DEVICE_CUDA_HOST && + device_dst->device_type == ARROW_DEVICE_CPU) { + memcpy(((uint8_t*)dst.private_data) + dst.offset_bytes, + ((uint8_t*)src.private_data) + src.offset_bytes, dst.size_bytes); + return NANOARROW_OK; + } else { + return ENOTSUP; + } +} + +static int ArrowDeviceCudaCopyRequired(struct ArrowDevice* device_src, + struct ArrowArrayView* src, + struct ArrowDevice* device_dst) { + if (device_src->device_type == ARROW_DEVICE_CPU && + device_dst->device_type == ARROW_DEVICE_CUDA) { + // Copy + return 1; + + } else if (device_src->device_type == ARROW_DEVICE_CUDA && + device_dst->device_type == ARROW_DEVICE_CUDA && + device_src->device_id == device_dst->device_id) { + // Move + return 0; + + } else if (device_src->device_type == ARROW_DEVICE_CUDA && + device_dst->device_type == ARROW_DEVICE_CPU) { + // Copy + return 1; + + } else if (device_src->device_type == ARROW_DEVICE_CPU && + device_dst->device_type == ARROW_DEVICE_CUDA_HOST) { + // Copy: we can't assume the memory has been registered. A user can force + // this by registering the memory and setting device->device_type manually. + // A copy will ensure all buffers are allocated with cudaMallocHost(). + return 1; + + } else if (device_src->device_type == ARROW_DEVICE_CUDA_HOST && + device_dst->device_type == ARROW_DEVICE_CUDA_HOST && + device_src->device_id == device_dst->device_id) { + // Move + return 0; + + } else if (device_src->device_type == ARROW_DEVICE_CUDA_HOST && + device_dst->device_type == ARROW_DEVICE_CPU) { + // Move: the array's release callback is responsible for cudaFreeHost or + // deregistration (or perhaps this has been handled at a higher level) + return 0; + + } else { + // Fall back to the other device's implementation + return -1; + } +} + +static ArrowErrorCode ArrowDeviceCudaSynchronize(struct ArrowDevice* device, + struct ArrowDevice* device_event, + void* sync_event, + struct ArrowError* error) { + if (sync_event == NULL) { + return NANOARROW_OK; + } + + if (device_event->device_type != ARROW_DEVICE_CUDA || + device_event->device_type != ARROW_DEVICE_CUDA_HOST) { + return ENOTSUP; + } + + // Pointer vs. not pointer...is there memory ownership to consider here? + cudaEvent_t* cuda_event = (cudaEvent_t*)sync_event; + cudaError_t result = cudaEventSynchronize(*cuda_event); + + if (result != cudaSuccess) { + ArrowErrorSet(error, "cudaEventSynchronize() failed: %s", cudaGetErrorString(result)); + return EINVAL; + } + + cudaEventDestroy(*cuda_event); Review Comment: I'll have to rework a few things to accommodate that but it does make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org