jorisvandenbossche commented on code in PR #43729:
URL: https://github.com/apache/arrow/pull/43729#discussion_r1732630789


##########
python/pyarrow/table.pxi:
##########
@@ -2793,6 +2804,11 @@ cdef class RecordBatch(_Tabular):
         if isinstance(column, Array):
             c_arr = column
         else:
+            if not self.is_cpu:
+                raise RuntimeError("A pa.Array() object is required when "

Review Comment:
   Maybe this can be a TypeError? (the argument is of the wrong type)



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)

Review Comment:
   ```suggestion
       assert batch.num_columns == len(expected_schema.names)
   ```



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)
+    assert str(batch) in repr(batch)
+    for c in batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+                             cuda_arrays, schema):
+    verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+    assert cuda_recordbatch.num_rows == 5
+
+    # add_column() test
+    col = pa.Array.from_buffers(
+        pa.int8(), 5,
+        [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+                                             dtype=np.int8))])

Review Comment:
   This could also be (I think?):
   
   ```suggestion
       col = pa.array([6, 7, 8, 9, 10], 
pa.int8()).copy_to(cuda_context.memory_manager)
   ```



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)
+    assert str(batch) in repr(batch)
+    for c in batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+                             cuda_arrays, schema):
+    verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+    assert cuda_recordbatch.num_rows == 5
+
+    # add_column() test
+    col = pa.Array.from_buffers(
+        pa.int8(), 5,
+        [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+                                             dtype=np.int8))])
+    new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+    assert len(new_batch.columns) == 3
+    for c in new_batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    with pytest.raises(RuntimeError):
+        # Default array conversion not allowed for non-cpu record batch
+        cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+    # remove_column() test
+    new_batch = cuda_recordbatch.remove_column(1)
+    assert len(new_batch.columns) == 1
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+    assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+    # drop_columns() test
+    new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
+    assert len(new_batch.columns) == 0
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+    # cast() test
+    new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1', 
pa.int64())])
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.cast(new_schema)
+
+    # drop_null() test
+    validity = cuda_context.buffer_from_data(
+        np.array([True, False, True, False, True], dtype=np.bool_))
+    null_col = pa.Array.from_buffers(
+        pa.int32(), 5,
+        [validity, cuda_context.buffer_from_data(np.array([0] * 5, 
dtype=np.int32))])

Review Comment:
   This can also be simplified with a `null_col = pa.array(...).copy_to()`



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)

Review Comment:
   But also explicitly checking that `batch.column_names` works is maybe also 
good, e.g. `assert batch.column_names == expected.schema.names`



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)
+    assert str(batch) in repr(batch)
+    for c in batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+                             cuda_arrays, schema):
+    verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+    assert cuda_recordbatch.num_rows == 5
+
+    # add_column() test
+    col = pa.Array.from_buffers(
+        pa.int8(), 5,
+        [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+                                             dtype=np.int8))])
+    new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+    assert len(new_batch.columns) == 3
+    for c in new_batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    with pytest.raises(RuntimeError):
+        # Default array conversion not allowed for non-cpu record batch
+        cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+    # remove_column() test
+    new_batch = cuda_recordbatch.remove_column(1)
+    assert len(new_batch.columns) == 1
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+    assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+    # drop_columns() test
+    new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
+    assert len(new_batch.columns) == 0
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+    # cast() test
+    new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1', 
pa.int64())])
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.cast(new_schema)
+
+    # drop_null() test
+    validity = cuda_context.buffer_from_data(
+        np.array([True, False, True, False, True], dtype=np.bool_))
+    null_col = pa.Array.from_buffers(
+        pa.int32(), 5,
+        [validity, cuda_context.buffer_from_data(np.array([0] * 5, 
dtype=np.int32))])
+    cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2', 
null_col)
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch_with_nulls.drop_null()
+
+    # filter() test

Review Comment:
   Similarly also `take()` and `sort_by()` ?



##########
python/pyarrow/table.pxi:
##########
@@ -3438,6 +3457,7 @@ cdef class RecordBatch(_Tabular):
 
         result = pyarrow_wrap_batch(CRecordBatch.Make(c_schema, num_rows,
                                                       c_arrays))
+

Review Comment:
   ```suggestion
   ```



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)
+    assert str(batch) in repr(batch)
+    for c in batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+                             cuda_arrays, schema):
+    verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+    assert cuda_recordbatch.num_rows == 5
+
+    # add_column() test
+    col = pa.Array.from_buffers(
+        pa.int8(), 5,
+        [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+                                             dtype=np.int8))])
+    new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+    assert len(new_batch.columns) == 3
+    for c in new_batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    with pytest.raises(RuntimeError):
+        # Default array conversion not allowed for non-cpu record batch
+        cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])

Review Comment:
   This tests the error you raise from cython, but what error do you get if you 
do:
   
   ```
   cuda_recordbatch.add_column(2, 'c2', pa.array([1, 1, 1, 1, 1]))
   ```
   
   i.e. adding a column with a cpu pyarrow array



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)
+    assert str(batch) in repr(batch)
+    for c in batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+                             cuda_arrays, schema):
+    verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+    assert cuda_recordbatch.num_rows == 5
+

Review Comment:
   ```suggestion
       assert cuda_recordbatch.shape == (5, 2)
   
   ```



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)
+    assert str(batch) in repr(batch)
+    for c in batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+                             cuda_arrays, schema):
+    verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+    assert cuda_recordbatch.num_rows == 5
+
+    # add_column() test
+    col = pa.Array.from_buffers(
+        pa.int8(), 5,
+        [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+                                             dtype=np.int8))])
+    new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+    assert len(new_batch.columns) == 3
+    for c in new_batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    with pytest.raises(RuntimeError):
+        # Default array conversion not allowed for non-cpu record batch
+        cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+    # remove_column() test
+    new_batch = cuda_recordbatch.remove_column(1)
+    assert len(new_batch.columns) == 1
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+    assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+    # drop_columns() test
+    new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
+    assert len(new_batch.columns) == 0
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+    # cast() test
+    new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1', 
pa.int64())])
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.cast(new_schema)
+
+    # drop_null() test
+    validity = cuda_context.buffer_from_data(
+        np.array([True, False, True, False, True], dtype=np.bool_))
+    null_col = pa.Array.from_buffers(
+        pa.int32(), 5,
+        [validity, cuda_context.buffer_from_data(np.array([0] * 5, 
dtype=np.int32))])
+    cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2', 
null_col)
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch_with_nulls.drop_null()
+
+    # filter() test
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.filter([True] * 5)
+
+    # field() test
+    assert cuda_recordbatch.field(0) == pa.field('c0', pa.int16())
+    assert cuda_recordbatch.field(1) == pa.field('c1', pa.int32())
+
+    # equals() test
+    new_batch = cpu_recordbatch.copy_to(cuda_context.memory_manager)
+    with pytest.raises(NotImplementedError):
+        assert cuda_recordbatch.equals(new_batch) is True
+
+    # from_arrays() test
+    new_batch = pa.RecordBatch.from_arrays(cuda_arrays, ['c0', 'c1'])
+    verify_recordbatch_on_cuda_device(new_batch, expected_schema=schema)
+
+    # from_pydict() test
+    new_batch = pa.RecordBatch.from_pydict({'c0': cuda_arrays[0], 'c1': 
cuda_arrays[1]})
+    verify_recordbatch_on_cuda_device(new_batch, expected_schema=schema)
+
+    # nybtes test
+    with pytest.raises(NotImplementedError):
+        assert cuda_recordbatch.nbytes == cpu_recordbatch.nbytes

Review Comment:
   Also test the variant `get_total_buffer_size`? 
   I don't remember exactly what we discussed about this for the Array case, 
but I see that there we added `_assert_cpu()` for both, while I would think 
that `get_total_buffer_size()` could work for CUDA (EDIT: ah, but it uses the 
buffer's address to avoid counting one buffer twice, so that would require an 
update on the C++ side to not use the CPU-only buffer address attribute)



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)
+    assert str(batch) in repr(batch)
+    for c in batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+                             cuda_arrays, schema):
+    verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+    assert cuda_recordbatch.num_rows == 5
+
+    # add_column() test

Review Comment:
   There is also a `set_column` variant of this functionality, that seems to 
have its own (C++) implementation. I assume that will also work, but so we 
should also test that here.



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)
+    assert str(batch) in repr(batch)
+    for c in batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+                             cuda_arrays, schema):
+    verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+    assert cuda_recordbatch.num_rows == 5
+
+    # add_column() test
+    col = pa.Array.from_buffers(
+        pa.int8(), 5,
+        [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+                                             dtype=np.int8))])
+    new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+    assert len(new_batch.columns) == 3
+    for c in new_batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    with pytest.raises(RuntimeError):
+        # Default array conversion not allowed for non-cpu record batch
+        cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+    # remove_column() test
+    new_batch = cuda_recordbatch.remove_column(1)
+    assert len(new_batch.columns) == 1
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+    assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+    # drop_columns() test
+    new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
+    assert len(new_batch.columns) == 0
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+    # cast() test
+    new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1', 
pa.int64())])
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.cast(new_schema)
+
+    # drop_null() test
+    validity = cuda_context.buffer_from_data(
+        np.array([True, False, True, False, True], dtype=np.bool_))
+    null_col = pa.Array.from_buffers(
+        pa.int32(), 5,
+        [validity, cuda_context.buffer_from_data(np.array([0] * 5, 
dtype=np.int32))])
+    cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2', 
null_col)
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch_with_nulls.drop_null()
+
+    # filter() test
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.filter([True] * 5)
+
+    # field() test
+    assert cuda_recordbatch.field(0) == pa.field('c0', pa.int16())
+    assert cuda_recordbatch.field(1) == pa.field('c1', pa.int32())
+
+    # equals() test
+    new_batch = cpu_recordbatch.copy_to(cuda_context.memory_manager)
+    with pytest.raises(NotImplementedError):
+        assert cuda_recordbatch.equals(new_batch) is True
+
+    # from_arrays() test
+    new_batch = pa.RecordBatch.from_arrays(cuda_arrays, ['c0', 'c1'])
+    verify_recordbatch_on_cuda_device(new_batch, expected_schema=schema)
+

Review Comment:
   ```suggestion
       assert 
new_batch.copy_to(pa.default_cpu_memory_manager()).equals(cpu_recordbatch)
   
   ```
   
   (and same below for from_pydict)



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)
+    assert str(batch) in repr(batch)
+    for c in batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+                             cuda_arrays, schema):
+    verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+    assert cuda_recordbatch.num_rows == 5
+
+    # add_column() test
+    col = pa.Array.from_buffers(
+        pa.int8(), 5,
+        [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+                                             dtype=np.int8))])
+    new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+    assert len(new_batch.columns) == 3
+    for c in new_batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    with pytest.raises(RuntimeError):
+        # Default array conversion not allowed for non-cpu record batch
+        cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+    # remove_column() test
+    new_batch = cuda_recordbatch.remove_column(1)
+    assert len(new_batch.columns) == 1
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+    assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA
+
+    # drop_columns() test
+    new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
+    assert len(new_batch.columns) == 0
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+
+    # cast() test
+    new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1', 
pa.int64())])
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.cast(new_schema)
+
+    # drop_null() test
+    validity = cuda_context.buffer_from_data(
+        np.array([True, False, True, False, True], dtype=np.bool_))
+    null_col = pa.Array.from_buffers(
+        pa.int32(), 5,
+        [validity, cuda_context.buffer_from_data(np.array([0] * 5, 
dtype=np.int32))])
+    cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2', 
null_col)
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch_with_nulls.drop_null()
+
+    # filter() test
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.filter([True] * 5)
+
+    # field() test
+    assert cuda_recordbatch.field(0) == pa.field('c0', pa.int16())
+    assert cuda_recordbatch.field(1) == pa.field('c1', pa.int32())
+
+    # equals() test
+    new_batch = cpu_recordbatch.copy_to(cuda_context.memory_manager)
+    with pytest.raises(NotImplementedError):
+        assert cuda_recordbatch.equals(new_batch) is True
+
+    # from_arrays() test
+    new_batch = pa.RecordBatch.from_arrays(cuda_arrays, ['c0', 'c1'])
+    verify_recordbatch_on_cuda_device(new_batch, expected_schema=schema)
+
+    # from_pydict() test
+    new_batch = pa.RecordBatch.from_pydict({'c0': cuda_arrays[0], 'c1': 
cuda_arrays[1]})
+    verify_recordbatch_on_cuda_device(new_batch, expected_schema=schema)
+
+    # nybtes test
+    with pytest.raises(NotImplementedError):
+        assert cuda_recordbatch.nbytes == cpu_recordbatch.nbytes
+
+    # to_pydict() test
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.to_pydict()
+
+    # to_pylist() test
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.to_pylist()
+
+    # to_pandas() test
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.to_pandas()
+
+    # to_tensor() test
+    with pytest.raises(NotImplementedError):
+        cuda_recordbatch.to_tensor()
+
+    # to_struct_array() test

Review Comment:
   test `from_struct_array` as well? (and that might also require an additional 
`_assert_cpu`?)



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3357,3 +3357,167 @@ def test_invalid_non_join_column():
     with pytest.raises(pa.lib.ArrowInvalid) as excinfo:
         t2.join(t1, 'id', join_type='inner')
     assert exp_error_msg in str(excinfo.value)
+
+
[email protected]
+def cuda_context():
+    cuda = pytest.importorskip("pyarrow.cuda")
+    return cuda.Context(0)
+
+
[email protected]
+def schema():
+    return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())])
+
+
[email protected]
+def cpu_arrays():
+    return [pa.array([1, 2, 3, 4, 5], pa.int16()),
+            pa.array([-10, -5, 0, 1, 10], pa.int32())]
+
+
[email protected]
+def cuda_arrays(cuda_context, cpu_arrays):
+    return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays]
+
+
[email protected]
+def cpu_recordbatch(cpu_arrays, schema):
+    return pa.record_batch(cpu_arrays, schema=schema)
+
+
[email protected]
+def cuda_recordbatch(cuda_context, cpu_recordbatch):
+    return cpu_recordbatch.copy_to(cuda_context.memory_manager)
+
+
+def verify_recordbatch_on_cuda_device(batch, expected_schema):
+    batch.validate()
+    assert batch.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.is_cpu is False
+    assert batch.num_columns == len(batch.column_names)
+    assert str(batch) in repr(batch)
+    for c in batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    assert batch.schema == expected_schema
+
+
+def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
+                             cuda_arrays, schema):
+    verify_recordbatch_on_cuda_device(cuda_recordbatch, expected_schema=schema)
+    assert cuda_recordbatch.num_rows == 5
+
+    # add_column() test
+    col = pa.Array.from_buffers(
+        pa.int8(), 5,
+        [None, cuda_context.buffer_from_data(np.array([6, 7, 8, 9, 10],
+                                             dtype=np.int8))])
+    new_batch = cuda_recordbatch.add_column(2, 'c2', col)
+    assert len(new_batch.columns) == 3
+    for c in new_batch.columns:
+        assert c.device_type == pa.DeviceAllocationType.CUDA
+    with pytest.raises(RuntimeError):
+        # Default array conversion not allowed for non-cpu record batch
+        cuda_recordbatch.add_column(2, 'c2', [1, 1, 1, 1, 1])
+
+    # remove_column() test
+    new_batch = cuda_recordbatch.remove_column(1)
+    assert len(new_batch.columns) == 1
+    assert new_batch.device_type == pa.DeviceAllocationType.CUDA
+    assert new_batch[0].device_type == pa.DeviceAllocationType.CUDA

Review Comment:
   Could also replace this by using `verify_recordbatch_on_cuda_device` with 
`expected_schema=schema.remove(1)` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to