jorisvandenbossche commented on code in PR #43729:
URL: https://github.com/apache/arrow/pull/43729#discussion_r1732630789
##########
python/pyarrow/table.pxi:
##########
@@ -2793,6 +2804,11 @@ cdef class RecordBatch(_Tabular):
if isinstance(column, Array):
c_arr = column
else:
+ if not self.is_cpu:
+ raise RuntimeError("A pa.Array() object is required when "
Review Comment:
Maybe this can be a TypeError? (the argument is of the wrong type)
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
Review Comment:
```suggestion
assert batch.num_columns == len(expected_schema.names)
```
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
+ assert str(batch) in repr(batch)
+ for c in batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+ cuda_arrays, schema):
+ verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+ assert cuda_recordbatch.num_rows == 5
+
+ # add_column() test
+ col = pa.Array.from_buffers(
+ pa.int8(), 5,
+ [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+ dtype=np.int8))])
Review Comment:
This could also be (I think?):
```suggestion
col = pa.array([6, 7, 8, 9, 10],
pa.int8()).copy_to(cuda_context.memory_manager)
```
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
+ assert str(batch) in repr(batch)
+ for c in batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+ cuda_arrays, schema):
+ verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+ assert cuda_recordbatch.num_rows == 5
+
+ # add_column() test
+ col = pa.Array.from_buffers(
+ pa.int8(), 5,
+ [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+ dtype=np.int8))])
+ new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+ assert len(new_batch.columns) == 3
+ for c in new_batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ with pytest.raises(RuntimeError):
+ # Default array conversion not allowed for non-cpu record batch
+ cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+ # remove_column() test
+ new_batch = cuda_recordbatch.remove_column(1)
+ assert len(new_batch.columns) == 1
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+ assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+ # drop_columns() test
+ new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
+ assert len(new_batch.columns) == 0
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+ # cast() test
+ new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1',
pa.int64())])
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.cast(new_schema)
+
+ # drop_null() test
+ validity = cuda_context.buffer_from_data(
+ np.array([True, False, True, False, True], dtype=np.bool_))
+ null_col = pa.Array.from_buffers(
+ pa.int32(), 5,
+ [validity, cuda_context.buffer_from_data(np.array([0] * 5,
dtype=np.int32))])
Review Comment:
This can also be simplified with a `null_col = pa.array(...).copy_to()`
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
Review Comment:
But also explicitly checking that `batch.column_names` works is maybe also
good, e.g. `assert batch.column_names == expected.schema.names`
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
+ assert str(batch) in repr(batch)
+ for c in batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+ cuda_arrays, schema):
+ verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+ assert cuda_recordbatch.num_rows == 5
+
+ # add_column() test
+ col = pa.Array.from_buffers(
+ pa.int8(), 5,
+ [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+ dtype=np.int8))])
+ new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+ assert len(new_batch.columns) == 3
+ for c in new_batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ with pytest.raises(RuntimeError):
+ # Default array conversion not allowed for non-cpu record batch
+ cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+ # remove_column() test
+ new_batch = cuda_recordbatch.remove_column(1)
+ assert len(new_batch.columns) == 1
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+ assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+ # drop_columns() test
+ new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
+ assert len(new_batch.columns) == 0
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+ # cast() test
+ new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1',
pa.int64())])
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.cast(new_schema)
+
+ # drop_null() test
+ validity = cuda_context.buffer_from_data(
+ np.array([True, False, True, False, True], dtype=np.bool_))
+ null_col = pa.Array.from_buffers(
+ pa.int32(), 5,
+ [validity, cuda_context.buffer_from_data(np.array([0] * 5,
dtype=np.int32))])
+ cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2',
null_col)
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch_with_nulls.drop_null()
+
+ # filter() test
Review Comment:
Similarly also `take()` and `sort_by()` ?
##########
python/pyarrow/table.pxi:
##########
@@ -3438,6 +3457,7 @@ cdef class RecordBatch(_Tabular):
result = pyarrow_wrap_batch(CRecordBatch.Make(c_schema, num_rows,
c_arrays))
+
Review Comment:
```suggestion
```
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
+ assert str(batch) in repr(batch)
+ for c in batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+ cuda_arrays, schema):
+ verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+ assert cuda_recordbatch.num_rows == 5
+
+ # add_column() test
+ col = pa.Array.from_buffers(
+ pa.int8(), 5,
+ [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+ dtype=np.int8))])
+ new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+ assert len(new_batch.columns) == 3
+ for c in new_batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ with pytest.raises(RuntimeError):
+ # Default array conversion not allowed for non-cpu record batch
+ cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
Review Comment:
This tests the error you raise from cython, but what error do you get if you
do:
```
cuda_recordbatch.add_column(2, 'c2', pa.array([1, 1, 1, 1, 1]))
```
i.e. adding a column with a cpu pyarrow array
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
+ assert str(batch) in repr(batch)
+ for c in batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+ cuda_arrays, schema):
+ verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+ assert cuda_recordbatch.num_rows == 5
+
Review Comment:
```suggestion
assert cuda_recordbatch.shape == (5, 2)
```
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
+ assert str(batch) in repr(batch)
+ for c in batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+ cuda_arrays, schema):
+ verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+ assert cuda_recordbatch.num_rows == 5
+
+ # add_column() test
+ col = pa.Array.from_buffers(
+ pa.int8(), 5,
+ [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+ dtype=np.int8))])
+ new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+ assert len(new_batch.columns) == 3
+ for c in new_batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ with pytest.raises(RuntimeError):
+ # Default array conversion not allowed for non-cpu record batch
+ cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+ # remove_column() test
+ new_batch = cuda_recordbatch.remove_column(1)
+ assert len(new_batch.columns) == 1
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+ assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+ # drop_columns() test
+ new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
+ assert len(new_batch.columns) == 0
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+ # cast() test
+ new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1',
pa.int64())])
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.cast(new_schema)
+
+ # drop_null() test
+ validity = cuda_context.buffer_from_data(
+ np.array([True, False, True, False, True], dtype=np.bool_))
+ null_col = pa.Array.from_buffers(
+ pa.int32(), 5,
+ [validity, cuda_context.buffer_from_data(np.array([0] * 5,
dtype=np.int32))])
+ cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2',
null_col)
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch_with_nulls.drop_null()
+
+ # filter() test
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.filter([True] * 5)
+
+ # field() test
+ assert cuda_recordbatch.field(0) == pa.field('c0', pa.int16())
+ assert cuda_recordbatch.field(1) == pa.field('c1', pa.int32())
+
+ # equals() test
+ new_batch = cpu_recordbatch.copy_to(cuda_context.memory_manager)
+ with pytest.raises(NotImplementedError):
+ assert cuda_recordbatch.equals(new_batch) is True
+
+ # from_arrays() test
+ new_batch = pa.RecordBatch.from_arrays(cuda_arrays, ['c0', 'c1'])
+ verify_recordbatch_on_cuda_device(new_batch, expected_schema=schema)
+
+ # from_pydict() test
+ new_batch = pa.RecordBatch.from_pydict({'c0': cuda_arrays[0], 'c1':
cuda_arrays[1]})
+ verify_recordbatch_on_cuda_device(new_batch, expected_schema=schema)
+
+ # nybtes test
+ with pytest.raises(NotImplementedError):
+ assert cuda_recordbatch.nbytes == cpu_recordbatch.nbytes
Review Comment:
Also test the variant `get_total_buffer_size`?
I don't remember exactly what we discussed about this for the Array case,
but I see that there we added `_assert_cpu()` for both, while I would think
that `get_total_buffer_size()` could work for CUDA (EDIT: ah, but it uses the
buffer's address to avoid counting one buffer twice, so that would require an
update on the C++ side to not use the CPU-only buffer address attribute)
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
+ assert str(batch) in repr(batch)
+ for c in batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+ cuda_arrays, schema):
+ verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+ assert cuda_recordbatch.num_rows == 5
+
+ # add_column() test
Review Comment:
There is also a `set_column` variant of this functionality, that seems to
have its own (C++) implementation. I assume that will also work, but so we
should also test that here.
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
+ assert str(batch) in repr(batch)
+ for c in batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+ cuda_arrays, schema):
+ verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+ assert cuda_recordbatch.num_rows == 5
+
+ # add_column() test
+ col = pa.Array.from_buffers(
+ pa.int8(), 5,
+ [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+ dtype=np.int8))])
+ new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+ assert len(new_batch.columns) == 3
+ for c in new_batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ with pytest.raises(RuntimeError):
+ # Default array conversion not allowed for non-cpu record batch
+ cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+ # remove_column() test
+ new_batch = cuda_recordbatch.remove_column(1)
+ assert len(new_batch.columns) == 1
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+ assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+ # drop_columns() test
+ new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
+ assert len(new_batch.columns) == 0
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+ # cast() test
+ new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1',
pa.int64())])
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.cast(new_schema)
+
+ # drop_null() test
+ validity = cuda_context.buffer_from_data(
+ np.array([True, False, True, False, True], dtype=np.bool_))
+ null_col = pa.Array.from_buffers(
+ pa.int32(), 5,
+ [validity, cuda_context.buffer_from_data(np.array([0] * 5,
dtype=np.int32))])
+ cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2',
null_col)
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch_with_nulls.drop_null()
+
+ # filter() test
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.filter([True] * 5)
+
+ # field() test
+ assert cuda_recordbatch.field(0) == pa.field('c0', pa.int16())
+ assert cuda_recordbatch.field(1) == pa.field('c1', pa.int32())
+
+ # equals() test
+ new_batch = cpu_recordbatch.copy_to(cuda_context.memory_manager)
+ with pytest.raises(NotImplementedError):
+ assert cuda_recordbatch.equals(new_batch) is True
+
+ # from_arrays() test
+ new_batch = pa.RecordBatch.from_arrays(cuda_arrays, ['c0', 'c1'])
+ verify_recordbatch_on_cuda_device(new_batch, expected_schema=schema)
+
Review Comment:
```suggestion
assert
new_batch.copy_to(pa.default_cpu_memory_manager()).equals(cpu_recordbatch)
```
(and same below for from_pydict)
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
+ assert str(batch) in repr(batch)
+ for c in batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+ cuda_arrays, schema):
+ verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+ assert cuda_recordbatch.num_rows == 5
+
+ # add_column() test
+ col = pa.Array.from_buffers(
+ pa.int8(), 5,
+ [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+ dtype=np.int8))])
+ new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+ assert len(new_batch.columns) == 3
+ for c in new_batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ with pytest.raises(RuntimeError):
+ # Default array conversion not allowed for non-cpu record batch
+ cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+ # remove_column() test
+ new_batch = cuda_recordbatch.remove_column(1)
+ assert len(new_batch.columns) == 1
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+ assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+ # drop_columns() test
+ new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
+ assert len(new_batch.columns) == 0
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+ # cast() test
+ new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1',
pa.int64())])
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.cast(new_schema)
+
+ # drop_null() test
+ validity = cuda_context.buffer_from_data(
+ np.array([True, False, True, False, True], dtype=np.bool_))
+ null_col = pa.Array.from_buffers(
+ pa.int32(), 5,
+ [validity, cuda_context.buffer_from_data(np.array([0] * 5,
dtype=np.int32))])
+ cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2',
null_col)
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch_with_nulls.drop_null()
+
+ # filter() test
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.filter([True] * 5)
+
+ # field() test
+ assert cuda_recordbatch.field(0) == pa.field('c0', pa.int16())
+ assert cuda_recordbatch.field(1) == pa.field('c1', pa.int32())
+
+ # equals() test
+ new_batch = cpu_recordbatch.copy_to(cuda_context.memory_manager)
+ with pytest.raises(NotImplementedError):
+ assert cuda_recordbatch.equals(new_batch) is True
+
+ # from_arrays() test
+ new_batch = pa.RecordBatch.from_arrays(cuda_arrays, ['c0', 'c1'])
+ verify_recordbatch_on_cuda_device(new_batch, expected_schema=schema)
+
+ # from_pydict() test
+ new_batch = pa.RecordBatch.from_pydict({'c0': cuda_arrays[0], 'c1':
cuda_arrays[1]})
+ verify_recordbatch_on_cuda_device(new_batch, expected_schema=schema)
+
+ # nybtes test
+ with pytest.raises(NotImplementedError):
+ assert cuda_recordbatch.nbytes == cpu_recordbatch.nbytes
+
+ # to_pydict() test
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.to_pydict()
+
+ # to_pylist() test
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.to_pylist()
+
+ # to_pandas() test
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.to_pandas()
+
+ # to_tensor() test
+ with pytest.raises(NotImplementedError):
+ cuda_recordbatch.to_tensor()
+
+ # to_struct_array() test
Review Comment:
test `from_struct_array` as well? (and that might also require an additional
`_assert_cpu`?)
##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
t2.join(t1, 'id', join_type='inner')
assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+ cuda = pytest.importorskip("pyarrow.cuda")
+ return cuda.Context(0)
+
+
[email protected]
+def schema():
+ return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+ return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+ pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+ return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+ return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+ return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+ batch.validate()
+ assert batch.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.is_cpu is False
+ assert batch.num_columns == len(batch.column_names)
+ assert str(batch) in repr(batch)
+ for c in batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+ cuda_arrays, schema):
+ verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+ assert cuda_recordbatch.num_rows == 5
+
+ # add_column() test
+ col = pa.Array.from_buffers(
+ pa.int8(), 5,
+ [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+ dtype=np.int8))])
+ new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+ assert len(new_batch.columns) == 3
+ for c in new_batch.columns:
+ assert c.device_type == pa.DeviceAllocationType.CUDA
+ with pytest.raises(RuntimeError):
+ # Default array conversion not allowed for non-cpu record batch
+ cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+ # remove_column() test
+ new_batch = cuda_recordbatch.remove_column(1)
+ assert len(new_batch.columns) == 1
+ assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+ assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
Review Comment:
Could also replace this by using `verify_recordbatch_on_cuda_device` with
`expected_schema=schema.remove(1)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]