jorisvandenbossche commented on code in PR #43729:
URL: https://github.com/apache/arrow/pull/43729#discussion_r1724947534
##########
python/pyarrow/lib.pxd:
##########
@@ -519,6 +519,7 @@ cdef class Table(_Tabular):
CTable* table
cdef void init(self, const shared_ptr[CTable]& table)
+ cdef void _assert_cpu(self) except *
Review Comment:
Now you added this to `_Tabular`, it might no longer be needed to repeat
this here?
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,138 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
+def test_recordbatch_non_cpu():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ ctx = cuda.Context(0)
+
+ def create_data_on_cpu_device():
+ return [
+ np.array(range(5), dtype=np.int16),
+ np.array([-10, -5, 0, 1, 10], dtype=np.int32)
+ ]
+
+ def create_arrays_on_cuda_device():
+ cpu_data = create_data_on_cpu_device()
+ cuda_data = [ctx.buffer_from_data(x) for x in cpu_data]
+ cuda_arrays = [
+ pa.Array.from_buffers(pa.int16(), 5, [None, cuda_data[0]]),
+ pa.Array.from_buffers(pa.int32(), 5, [None, cuda_data[1]]),
+ ]
+ return cuda_arrays
+
+ def create_recordbatch_on_cuda_device():
+ cuda_arrays = create_arrays_on_cuda_device()
+ return pa.record_batch(cuda_arrays, ['c0', 'c1'])
+
+ def verify_recordbatch_on_cuda_device(rb, column_names=['c0', 'c1']):
+ rb.validate()
+ assert rb.device_type == pa.DeviceAllocationType.CUDA
+ assert rb.is_cpu is False
+ assert rb.num_columns == 2
Review Comment:
You could do
```suggestion
assert rb.num_columns == len(column_names)
```
and then it is easier to reuse this helper function for the cases of
dropping or adding a column below
##########
python/pyarrow/table.pxi:
##########
@@ -3436,8 +3449,18 @@ cdef class RecordBatch(_Tabular):
'{0} vs {1}'.format(len(arr), num_rows))
c_arrays.push_back(arr.sp_array)
- result = pyarrow_wrap_batch(CRecordBatch.Make(c_schema, num_rows,
- c_arrays))
+ if c_arrays.size() > 0:
+ c_device_type = deref(c_arrays[0]).device_type()
+ result = pyarrow_wrap_batch(CRecordBatch.MakeWithDevice(c_schema,
+ num_rows,
+ c_arrays,
+
c_device_type,
Review Comment:
We are assuming here that then all arrays have the same device. I think that
is the same assumption as in C++? (where this is only verified in debug mode?)
In which case that is probably fine for now?
##########
cpp/src/arrow/record_batch.cc:
##########
@@ -59,10 +59,11 @@ class SimpleRecordBatch : public RecordBatch {
public:
SimpleRecordBatch(std::shared_ptr<Schema> schema, int64_t num_rows,
std::vector<std::shared_ptr<Array>> columns,
+ DeviceAllocationType device_type =
DeviceAllocationType::kCPU,
std::shared_ptr<Device::SyncEvent> sync_event = nullptr)
: RecordBatch(std::move(schema), num_rows),
boxed_columns_(std::move(columns)),
- device_type_(DeviceAllocationType::kCPU),
+ device_type_(device_type),
sync_event_(std::move(sync_event)) {
if (boxed_columns_.size() > 0) {
device_type_ = boxed_columns_[0]->device_type();
Review Comment:
It's a bit strange to have a default CPU device in the signature, while we
then actually get it from the array. Because then it seems that a user can set
this, while it will actually be ignored? (except for the case of 0 columns)
What was the reason you added this? Because it needed to be set from Python?
(but also there the arrays should already have a device type, so was this only
for the case of 0 columns, e.g. after dropping the other columns)
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,138 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
+def test_recordbatch_non_cpu():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ ctx = cuda.Context(0)
+
+ def create_data_on_cpu_device():
+ return [
+ np.array(range(5), dtype=np.int16),
+ np.array([-10, -5, 0, 1, 10], dtype=np.int32)
+ ]
+
+ def create_arrays_on_cuda_device():
+ cpu_data = create_data_on_cpu_device()
+ cuda_data = [ctx.buffer_from_data(x) for x in cpu_data]
+ cuda_arrays = [
+ pa.Array.from_buffers(pa.int16(), 5, [None, cuda_data[0]]),
+ pa.Array.from_buffers(pa.int32(), 5, [None, cuda_data[1]]),
+ ]
+ return cuda_arrays
+
+ def create_recordbatch_on_cuda_device():
+ cuda_arrays = create_arrays_on_cuda_device()
+ return pa.record_batch(cuda_arrays, ['c0', 'c1'])
+
+ def verify_recordbatch_on_cuda_device(rb, column_names=['c0', 'c1']):
+ rb.validate()
Review Comment:
`validate` works, but also with `full=True`? (for Array, you only added to
`self._assert_cpu()` after `if full: ..`, it might be similar here?)
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,138 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
+def test_recordbatch_non_cpu():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ ctx = cuda.Context(0)
+
+ def create_data_on_cpu_device():
+ return [
+ np.array(range(5), dtype=np.int16),
+ np.array([-10, -5, 0, 1, 10], dtype=np.int32)
+ ]
+
+ def create_arrays_on_cuda_device():
+ cpu_data = create_data_on_cpu_device()
+ cuda_data = [ctx.buffer_from_data(x) for x in cpu_data]
+ cuda_arrays = [
+ pa.Array.from_buffers(pa.int16(), 5, [None, cuda_data[0]]),
+ pa.Array.from_buffers(pa.int32(), 5, [None, cuda_data[1]]),
+ ]
+ return cuda_arrays
+
+ def create_recordbatch_on_cuda_device():
+ cuda_arrays = create_arrays_on_cuda_device()
+ return pa.record_batch(cuda_arrays, ['c0', 'c1'])
Review Comment:
I merged my `copy_to` PR, so that could simplify this part to simply create
cpu record batch and copy it in one go to CUDA
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,138 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
+def test_recordbatch_non_cpu():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ ctx = cuda.Context(0)
+
+ def create_data_on_cpu_device():
+ return [
+ np.array(range(5), dtype=np.int16),
+ np.array([-10, -5, 0, 1, 10], dtype=np.int32)
+ ]
+
+ def create_arrays_on_cuda_device():
+ cpu_data = create_data_on_cpu_device()
+ cuda_data = [ctx.buffer_from_data(x) for x in cpu_data]
+ cuda_arrays = [
+ pa.Array.from_buffers(pa.int16(), 5, [None, cuda_data[0]]),
+ pa.Array.from_buffers(pa.int32(), 5, [None, cuda_data[1]]),
+ ]
+ return cuda_arrays
+
+ def create_recordbatch_on_cuda_device():
+ cuda_arrays = create_arrays_on_cuda_device()
+ return pa.record_batch(cuda_arrays, ['c0', 'c1'])
+
+ def verify_recordbatch_on_cuda_device(rb, column_names=['c0', 'c1']):
+ rb.validate()
+ assert rb.device_type == pa.DeviceAllocationType.CUDA
+ assert rb.is_cpu is False
+ assert rb.num_columns == 2
+ assert rb.num_rows == 5
+ assert str(rb) in repr(rb)
+ for c in rb.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert rb.column_names == column_names
+
+ batch = create_recordbatch_on_cuda_device()
+ verify_recordbatch_on_cuda_device(batch)
+
+ # add_column() test
+ col = pa.Array.from_buffers(
+ pa.int8(), 5, [None, ctx.buffer_from_data(np.array([6, 7, 8, 9, 10],
dtype=np.int8))])
+ new_batch = batch.add_column(2, 'c2', col)
+ assert len(new_batch.columns) == 3
+ for c in new_batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ with pytest.raises(NotImplementedError):
+ # Default array conversion builds on CPU only
+ batch.add_column(2, 'c2', [1, 1, 1, 1, 1])
Review Comment:
So the reason this raises an error is because currently we store the device
type at the record batch level, and so when adding a column with a different
device type, that is checked and fails?
(although then it should not be "not implemented" error?)
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,138 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
+def test_recordbatch_non_cpu():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ ctx = cuda.Context(0)
+
+ def create_data_on_cpu_device():
+ return [
+ np.array(range(5), dtype=np.int16),
+ np.array([-10, -5, 0, 1, 10], dtype=np.int32)
+ ]
+
+ def create_arrays_on_cuda_device():
+ cpu_data = create_data_on_cpu_device()
+ cuda_data = [ctx.buffer_from_data(x) for x in cpu_data]
+ cuda_arrays = [
+ pa.Array.from_buffers(pa.int16(), 5, [None, cuda_data[0]]),
+ pa.Array.from_buffers(pa.int32(), 5, [None, cuda_data[1]]),
+ ]
+ return cuda_arrays
+
+ def create_recordbatch_on_cuda_device():
+ cuda_arrays = create_arrays_on_cuda_device()
+ return pa.record_batch(cuda_arrays, ['c0', 'c1'])
+
+ def verify_recordbatch_on_cuda_device(rb, column_names=['c0', 'c1']):
+ rb.validate()
+ assert rb.device_type == pa.DeviceAllocationType.CUDA
+ assert rb.is_cpu is False
+ assert rb.num_columns == 2
+ assert rb.num_rows == 5
+ assert str(rb) in repr(rb)
+ for c in rb.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert rb.column_names == column_names
+
+ batch = create_recordbatch_on_cuda_device()
+ verify_recordbatch_on_cuda_device(batch)
Review Comment:
For an initial verification, can maybe also check that the `batch.schema`
works and is equal to an expected schema
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,138 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
+def test_recordbatch_non_cpu():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ ctx = cuda.Context(0)
+
+ def create_data_on_cpu_device():
+ return [
+ np.array(range(5), dtype=np.int16),
+ np.array([-10, -5, 0, 1, 10], dtype=np.int32)
+ ]
+
+ def create_arrays_on_cuda_device():
+ cpu_data = create_data_on_cpu_device()
+ cuda_data = [ctx.buffer_from_data(x) for x in cpu_data]
+ cuda_arrays = [
+ pa.Array.from_buffers(pa.int16(), 5, [None, cuda_data[0]]),
+ pa.Array.from_buffers(pa.int32(), 5, [None, cuda_data[1]]),
+ ]
+ return cuda_arrays
+
+ def create_recordbatch_on_cuda_device():
+ cuda_arrays = create_arrays_on_cuda_device()
+ return pa.record_batch(cuda_arrays, ['c0', 'c1'])
Review Comment:
And similar for `create_arrays_on_cuda_device` just above. We still need
that function to test the methods to create a RecordBatch from arrays etc, but
the function itself can use `copy_to`
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,138 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
+def test_recordbatch_non_cpu():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ ctx = cuda.Context(0)
+
+ def create_data_on_cpu_device():
+ return [
+ np.array(range(5), dtype=np.int16),
+ np.array([-10, -5, 0, 1, 10], dtype=np.int32)
+ ]
+
+ def create_arrays_on_cuda_device():
+ cpu_data = create_data_on_cpu_device()
+ cuda_data = [ctx.buffer_from_data(x) for x in cpu_data]
+ cuda_arrays = [
+ pa.Array.from_buffers(pa.int16(), 5, [None, cuda_data[0]]),
+ pa.Array.from_buffers(pa.int32(), 5, [None, cuda_data[1]]),
+ ]
+ return cuda_arrays
+
+ def create_recordbatch_on_cuda_device():
+ cuda_arrays = create_arrays_on_cuda_device()
+ return pa.record_batch(cuda_arrays, ['c0', 'c1'])
+
+ def verify_recordbatch_on_cuda_device(rb, column_names=['c0', 'c1']):
+ rb.validate()
+ assert rb.device_type == pa.DeviceAllocationType.CUDA
+ assert rb.is_cpu is False
+ assert rb.num_columns == 2
+ assert rb.num_rows == 5
+ assert str(rb) in repr(rb)
+ for c in rb.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert rb.column_names == column_names
+
+ batch = create_recordbatch_on_cuda_device()
+ verify_recordbatch_on_cuda_device(batch)
+
+ # add_column() test
+ col = pa.Array.from_buffers(
+ pa.int8(), 5, [None, ctx.buffer_from_data(np.array([6, 7, 8, 9, 10],
dtype=np.int8))])
+ new_batch = batch.add_column(2, 'c2', col)
+ assert len(new_batch.columns) == 3
+ for c in new_batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ with pytest.raises(NotImplementedError):
+ # Default array conversion builds on CPU only
+ batch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+ # remove_column() test
+ new_batch = batch.remove_column(1)
+ assert len(new_batch.columns) == 1
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+ assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+ # drop_columns() test
+ new_batch = batch.drop_columns(['c0', 'c1'])
+ assert len(new_batch.columns) == 0
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+ # cast() test
+ new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1',
pa.int64())])
+ with pytest.raises(NotImplementedError):
+ batch.cast(new_schema)
+
+ # drop_null() test
+ validity = ctx.buffer_from_data(
+ np.array([True, False, True, False, True], dtype=np.bool_))
+ null_col = pa.Array.from_buffers(
+ pa.int32(), 5, [validity, ctx.buffer_from_data(np.array([0] * 5,
dtype=np.int32))])
+ batch_with_nulls = batch.add_column(2, 'c2', null_col)
+ with pytest.raises(NotImplementedError):
+ batch_with_nulls.drop_null()
+
+ # filter() test
+ with pytest.raises(NotImplementedError):
+ batch.filter([True] * 5)
+
+ # field() test
+ assert batch.field(0) == pa.field('c0', pa.int16())
+ assert batch.field(1) == pa.field('c1', pa.int32())
+
+ # equals() test
+ new_batch = create_recordbatch_on_cuda_device()
+ with pytest.raises(NotImplementedError):
+ assert batch.equals(new_batch) is True
+
+ new_arrays = create_arrays_on_cuda_device()
+
+ # from_arrays() test
+ new_batch = pa.RecordBatch.from_arrays(new_arrays, ['c0', 'c1'])
+ verify_recordbatch_on_cuda_device(new_batch)
+
Review Comment:
Could now also add something like `new_batch.copy_to(CPU).equals(batch_cpu)`
if you originally create the batch directly on CPU once as well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]