This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 96f686b81b GH-40061: [C++][Python] Basic conversion of RecordBatch to
Arrow Tensor - add option to cast NULL to NaN (#40803)
96f686b81b is described below
commit 96f686b81ba148f4d434846f0b9e161c538f131d
Author: Alenka Frim <[email protected]>
AuthorDate: Fri Mar 29 08:30:03 2024 +0100
GH-40061: [C++][Python] Basic conversion of RecordBatch to Arrow Tensor -
add option to cast NULL to NaN (#40803)
### Rationale for this change
The conversion from `RecordBatch` to `Tensor` class exists but it doesn't
support record batches with validity bitmaps. This PR adds support for an
option to convert null values to NaN.
### What changes are included in this PR?
This PR adds a `nul_to_nan` option in `RecordBatch::ToTensor` so that null
values are converted to NaN in the resulting `Tensor`. This for example works:
```python
>>> import pyarrow as pa
>>> batch = pa.record_batch(
... [
... pa.array([1, 2, 3, 4, None], type=pa.int32()),
... pa.array([10, 20, 30, 40, None], type=pa.float32()),
... ], names = ["a", "b"]
... )
>>> batch
pyarrow.RecordBatch
a: int32
b: float
----
a: [1,2,3,4,null]
b: [10,20,30,40,null]
>>> batch.to_tensor(null_to_nan=True)
<pyarrow.Tensor>
type: double
shape: (5, 2)
strides: (8, 40)
>>> batch.to_tensor(null_to_nan=True).to_numpy()
array([[ 1., 10.],
[ 2., 20.],
[ 3., 30.],
[ 4., 40.],
[nan, nan]])
```
but default would raise:
```python
>>> batch.to_tensor()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pyarrow/table.pxi", line 3421, in pyarrow.lib.RecordBatch.to_tensor
a: int32
File "pyarrow/error.pxi", line 154, in
pyarrow.lib.pyarrow_internal_check_status
return check_status(status)
File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
raise convert_status(status)
pyarrow.lib.ArrowTypeError: Can only convert a RecordBatch with no nulls.
Set null_to_nan to true to convert nulls to nan
```
### Are these changes tested?
Yes.
### Are there any user-facing changes?
No.
* GitHub Issue: #40061
Lead-authored-by: AlenkaF <[email protected]>
Co-authored-by: Alenka Frim <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
---
cpp/src/arrow/record_batch.cc | 47 ++++++++++++++++------
cpp/src/arrow/record_batch.h | 6 ++-
cpp/src/arrow/record_batch_test.cc | 76 +++++++++++++++++++++++++++++++++++-
python/pyarrow/includes/libarrow.pxd | 2 +-
python/pyarrow/table.pxi | 49 +++++++++++++++++++++--
python/pyarrow/tests/test_table.py | 48 ++++++++++++++++++++++-
6 files changed, 208 insertions(+), 20 deletions(-)
diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
index 0d8bda9b66..6f3b8e75a2 100644
--- a/cpp/src/arrow/record_batch.cc
+++ b/cpp/src/arrow/record_batch.cc
@@ -18,6 +18,7 @@
#include "arrow/record_batch.h"
#include <algorithm>
+#include <cmath>
#include <cstdlib>
#include <memory>
#include <sstream>
@@ -261,12 +262,19 @@ struct ConvertColumnsToTensorVisitor {
using In = typename T::c_type;
auto in_values = ArraySpan(in_data).GetSpan<In>(1, in_data.length);
- if constexpr (std::is_same_v<In, Out>) {
- memcpy(out_values, in_values.data(), in_values.size_bytes());
- out_values += in_values.size();
+ if (in_data.null_count == 0) {
+ if constexpr (std::is_same_v<In, Out>) {
+ memcpy(out_values, in_values.data(), in_values.size_bytes());
+ out_values += in_values.size();
+ } else {
+ for (In in_value : in_values) {
+ *out_values++ = static_cast<Out>(in_value);
+ }
+ }
} else {
- for (In in_value : in_values) {
- *out_values++ = static_cast<Out>(in_value);
+ for (int64_t i = 0; i < in_data.length; ++i) {
+ *out_values++ =
+ in_data.IsNull(i) ? static_cast<Out>(NAN) :
static_cast<Out>(in_values[i]);
}
}
return Status::OK();
@@ -286,16 +294,20 @@ inline void ConvertColumnsToTensor(const RecordBatch&
batch, uint8_t* out) {
}
}
-Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
+Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan,
+ MemoryPool* pool) const {
if (num_columns() == 0) {
return Status::TypeError(
"Conversion to Tensor for RecordBatches without columns/schema is not "
"supported.");
}
// Check for no validity bitmap of each field
+ // if null_to_nan conversion is set to false
for (int i = 0; i < num_columns(); ++i) {
- if (column(i)->null_count() > 0) {
- return Status::TypeError("Can only convert a RecordBatch with no
nulls.");
+ if (column(i)->null_count() > 0 && !null_to_nan) {
+ return Status::TypeError(
+ "Can only convert a RecordBatch with no nulls. Set null_to_nan to
true to "
+ "convert nulls to NaN");
}
}
@@ -308,12 +320,12 @@ Result<std::shared_ptr<Tensor>>
RecordBatch::ToTensor(MemoryPool* pool) const {
std::shared_ptr<Field> result_field = schema_->field(0);
std::shared_ptr<DataType> result_type = result_field->type();
- if (num_columns() > 1) {
- Field::MergeOptions options;
- options.promote_integer_to_float = true;
- options.promote_integer_sign = true;
- options.promote_numeric_width = true;
+ Field::MergeOptions options;
+ options.promote_integer_to_float = true;
+ options.promote_integer_sign = true;
+ options.promote_numeric_width = true;
+ if (num_columns() > 1) {
for (int i = 1; i < num_columns(); ++i) {
if (!is_numeric(column(i)->type()->id())) {
return Status::TypeError("DataType is not supported: ",
@@ -334,6 +346,15 @@ Result<std::shared_ptr<Tensor>>
RecordBatch::ToTensor(MemoryPool* pool) const {
result_type = result_field->type();
}
+ // Check if result_type is signed or unsigned integer and null_to_nan is set
to true
+ // Then all columns should be promoted to float type
+ if (is_integer(result_type->id()) && null_to_nan) {
+ ARROW_ASSIGN_OR_RAISE(
+ result_field,
+ result_field->MergeWith(field(result_field->name(), float32()),
options));
+ result_type = result_field->type();
+ }
+
// Allocate memory
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Buffer> result,
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index 16d721caad..5202ff4abf 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -85,8 +85,12 @@ class ARROW_EXPORT RecordBatch {
/// Create a Tensor object with shape (number of rows, number of columns) and
/// strides (type size in bytes, type size in bytes * number of rows).
/// Generated Tensor will have column-major layout.
+ ///
+ /// \param[in] null_to_nan if true, convert nulls to NaN
+ /// \param[in] pool the memory pool to allocate the tensor buffer
+ /// \return the resulting Tensor
Result<std::shared_ptr<Tensor>> ToTensor(
- MemoryPool* pool = default_memory_pool()) const;
+ bool null_to_nan = false, MemoryPool* pool = default_memory_pool())
const;
/// \brief Construct record batch from struct array
///
diff --git a/cpp/src/arrow/record_batch_test.cc
b/cpp/src/arrow/record_batch_test.cc
index 81154452d7..7e0eb1d460 100644
--- a/cpp/src/arrow/record_batch_test.cc
+++ b/cpp/src/arrow/record_batch_test.cc
@@ -667,7 +667,8 @@ TEST_F(TestRecordBatch, ToTensorUnsupportedMissing) {
auto batch = RecordBatch::Make(schema, length, {a0, a1});
ASSERT_RAISES_WITH_MESSAGE(TypeError,
- "Type error: Can only convert a RecordBatch with
no nulls.",
+ "Type error: Can only convert a RecordBatch with
no nulls. "
+ "Set null_to_nan to true to convert nulls to NaN",
batch->ToTensor());
}
@@ -740,6 +741,79 @@ TEST_F(TestRecordBatch, ToTensorSupportedNaN) {
CheckTensor<FloatType>(tensor, 18, shape, f_strides);
}
+TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) {
+ const int length = 9;
+
+ // int32 + float32 = float64
+ auto f0 = field("f0", int32());
+ auto f1 = field("f1", float32());
+
+ std::vector<std::shared_ptr<Field>> fields = {f0, f1};
+ auto schema = ::arrow::schema(fields);
+
+ auto a0 = ArrayFromJSON(int32(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]");
+ auto a1 = ArrayFromJSON(float32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
+
+ auto batch = RecordBatch::Make(schema, length, {a0, a1});
+
+ ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor(/*null_to_nan=*/true));
+ ASSERT_OK(tensor->Validate());
+
+ std::vector<int64_t> shape = {9, 2};
+ const int64_t f64_size = sizeof(double);
+ std::vector<int64_t> f_strides = {f64_size, f64_size * shape[0]};
+ std::shared_ptr<Tensor> tensor_expected = TensorFromJSON(
+ float64(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60,
70, 80, 90]",
+ shape, f_strides);
+
+ EXPECT_FALSE(tensor_expected->Equals(*tensor));
+ EXPECT_TRUE(tensor_expected->Equals(*tensor,
EqualOptions().nans_equal(true)));
+
+ CheckTensor<DoubleType>(tensor, 18, shape, f_strides);
+
+ // int32 -> float64
+ auto f2 = field("f2", int32());
+
+ std::vector<std::shared_ptr<Field>> fields1 = {f0, f2};
+ auto schema1 = ::arrow::schema(fields1);
+
+ auto a2 = ArrayFromJSON(int32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
+ auto batch1 = RecordBatch::Make(schema1, length, {a0, a2});
+
+ ASSERT_OK_AND_ASSIGN(auto tensor1, batch1->ToTensor(/*null_to_nan=*/true));
+ ASSERT_OK(tensor1->Validate());
+
+ EXPECT_FALSE(tensor_expected->Equals(*tensor1));
+ EXPECT_TRUE(tensor_expected->Equals(*tensor1,
EqualOptions().nans_equal(true)));
+
+ CheckTensor<DoubleType>(tensor1, 18, shape, f_strides);
+
+ // int8 -> float32
+ auto f3 = field("f3", int8());
+ auto f4 = field("f4", int8());
+
+ std::vector<std::shared_ptr<Field>> fields2 = {f3, f4};
+ auto schema2 = ::arrow::schema(fields2);
+
+ auto a3 = ArrayFromJSON(int8(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]");
+ auto a4 = ArrayFromJSON(int8(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
+ auto batch2 = RecordBatch::Make(schema2, length, {a3, a4});
+
+ ASSERT_OK_AND_ASSIGN(auto tensor2, batch2->ToTensor(/*null_to_nan=*/true));
+ ASSERT_OK(tensor2->Validate());
+
+ const int64_t f32_size = sizeof(float);
+ std::vector<int64_t> f_strides_2 = {f32_size, f32_size * shape[0]};
+ std::shared_ptr<Tensor> tensor_expected_2 = TensorFromJSON(
+ float32(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60,
70, 80, 90]",
+ shape, f_strides_2);
+
+ EXPECT_FALSE(tensor_expected_2->Equals(*tensor2));
+ EXPECT_TRUE(tensor_expected_2->Equals(*tensor2,
EqualOptions().nans_equal(true)));
+
+ CheckTensor<FloatType>(tensor2, 18, shape, f_strides_2);
+}
+
TEST_F(TestRecordBatch, ToTensorSupportedTypesMixed) {
const int length = 9;
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 9e5e3d3fa6..aa50dd189a 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -984,7 +984,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CRecordBatch] Slice(int64_t offset)
shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)
- CResult[shared_ptr[CTensor]] ToTensor() const
+ CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, CMemoryPool*
pool) const
cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
shared_ptr[CRecordBatch] batch
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 1ab3fd04ed..54fda1da7d 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -3389,21 +3389,64 @@ cdef class RecordBatch(_Tabular):
<CResult[shared_ptr[CArray]]>deref(c_record_batch).ToStructArray())
return pyarrow_wrap_array(c_array)
- def to_tensor(self):
+ def to_tensor(self, c_bool null_to_nan=False, MemoryPool memory_pool=None):
"""
Convert to a :class:`~pyarrow.Tensor`.
RecordBatches that can be converted have fields of type signed or
unsigned
- integer or float, including all bit-widths, with no validity bitmask.
+ integer or float, including all bit-widths. RecordBatches with
validity bitmask
+ for any of the arrays can be converted with ``null_to_nan``turned to
``True``.
+ In this case null values are converted to NaN and signed or unsigned
integer
+ type arrays are promoted to appropriate float type.
+
+ Parameters
+ ----------
+ null_to_nan : bool, default False
+ Whether to write null values in the result as ``NaN``.
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required, otherwise use default pool
+
+ Examples
+ --------
+ >>> import pyarrow as pa
+ >>> batch = pa.record_batch(
+ ... [
+ ... pa.array([1, 2, 3, 4, None], type=pa.int32()),
+ ... pa.array([10, 20, 30, 40, None], type=pa.float32()),
+ ... ], names = ["a", "b"]
+ ... )
+
+ >>> batch
+ pyarrow.RecordBatch
+ a: int32
+ b: float
+ ----
+ a: [1,2,3,4,null]
+ b: [10,20,30,40,null]
+
+ >>> batch.to_tensor(null_to_nan=True)
+ <pyarrow.Tensor>
+ type: double
+ shape: (5, 2)
+ strides: (8, 40)
+
+ >>> batch.to_tensor(null_to_nan=True).to_numpy()
+ array([[ 1., 10.],
+ [ 2., 20.],
+ [ 3., 30.],
+ [ 4., 40.],
+ [nan, nan]])
"""
cdef:
shared_ptr[CRecordBatch] c_record_batch
shared_ptr[CTensor] c_tensor
+ CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
c_record_batch = pyarrow_unwrap_batch(self)
with nogil:
c_tensor = GetResultValue(
- <CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor())
+
<CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor(null_to_nan,
+
pool))
return pyarrow_wrap_tensor(c_tensor)
def _export_to_c(self, out_ptr, out_schema_ptr=0):
diff --git a/python/pyarrow/tests/test_table.py
b/python/pyarrow/tests/test_table.py
index a7d917c2ba..8e30574188 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -1061,7 +1061,7 @@ def test_recordbatch_to_tensor_null():
arr2 = [10, 20, 30, 40, 50, 60, 70, None, 90]
batch = pa.RecordBatch.from_arrays(
[
- pa.array(arr1, type=pa.float32()),
+ pa.array(arr1, type=pa.int32()),
pa.array(arr2, type=pa.float32()),
], ["a", "b"]
)
@@ -1071,6 +1071,52 @@ def test_recordbatch_to_tensor_null():
):
batch.to_tensor()
+ result = batch.to_tensor(null_to_nan=True)
+
+ x = np.array([arr1, arr2], np.float64).transpose()
+ expected = pa.Tensor.from_numpy(x)
+
+ np.testing.assert_equal(result.to_numpy(), x)
+ assert result.size == 18
+ assert result.type == pa.float64()
+ assert result.shape == expected.shape
+ assert result.strides == expected.strides
+
+ # int32 -> float64
+ batch = pa.RecordBatch.from_arrays(
+ [
+ pa.array(arr1, type=pa.int32()),
+ pa.array(arr2, type=pa.int32()),
+ ], ["a", "b"]
+ )
+
+ result = batch.to_tensor(null_to_nan=True)
+
+ np.testing.assert_equal(result.to_numpy(), x)
+ assert result.size == 18
+ assert result.type == pa.float64()
+ assert result.shape == expected.shape
+ assert result.strides == expected.strides
+
+ # int8 -> float32
+ batch = pa.RecordBatch.from_arrays(
+ [
+ pa.array(arr1, type=pa.int8()),
+ pa.array(arr2, type=pa.int8()),
+ ], ["a", "b"]
+ )
+
+ result = batch.to_tensor(null_to_nan=True)
+
+ x = np.array([arr1, arr2], np.float32).transpose()
+ expected = pa.Tensor.from_numpy(x)
+
+ np.testing.assert_equal(result.to_numpy(), x)
+ assert result.size == 18
+ assert result.type == pa.float32()
+ assert result.shape == expected.shape
+ assert result.strides == expected.strides
+
def test_recordbatch_to_tensor_empty():
batch = pa.RecordBatch.from_arrays(