This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 96f686b81b GH-40061: [C++][Python] Basic conversion of RecordBatch to 
Arrow Tensor - add option to cast NULL to NaN (#40803)
96f686b81b is described below

commit 96f686b81ba148f4d434846f0b9e161c538f131d
Author: Alenka Frim <[email protected]>
AuthorDate: Fri Mar 29 08:30:03 2024 +0100

    GH-40061: [C++][Python] Basic conversion of RecordBatch to Arrow Tensor - 
add option to cast NULL to NaN (#40803)
    
    ### Rationale for this change
    
    The conversion from `RecordBatch` to `Tensor` class exists but it doesn't 
support record batches with validity bitmaps. This PR adds support for an 
option to convert null values to NaN.
    
    ### What changes are included in this PR?
    
    This PR adds a `nul_to_nan` option in `RecordBatch::ToTensor` so that null 
values are converted to NaN in the resulting `Tensor`. This for example works:
    
    ```python
    >>> import pyarrow as pa
    >>> batch = pa.record_batch(
    ...     [
    ...         pa.array([1, 2, 3, 4, None], type=pa.int32()),
    ...         pa.array([10, 20, 30, 40, None], type=pa.float32()),
    ...     ], names = ["a", "b"]
    ... )
    
    >>> batch
    pyarrow.RecordBatch
    a: int32
    b: float
    ----
    a: [1,2,3,4,null]
    b: [10,20,30,40,null]
    
    >>> batch.to_tensor(null_to_nan=True)
    <pyarrow.Tensor>
    type: double
    shape: (5, 2)
    strides: (8, 40)
    
    >>> batch.to_tensor(null_to_nan=True).to_numpy()
    array([[ 1., 10.],
           [ 2., 20.],
           [ 3., 30.],
           [ 4., 40.],
           [nan, nan]])
    ```
    but default would raise:
    
    ```python
    >>> batch.to_tensor()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "pyarrow/table.pxi", line 3421, in pyarrow.lib.RecordBatch.to_tensor
        a: int32
      File "pyarrow/error.pxi", line 154, in 
pyarrow.lib.pyarrow_internal_check_status
        return check_status(status)
      File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
        raise convert_status(status)
    pyarrow.lib.ArrowTypeError: Can only convert a RecordBatch with no nulls. 
Set null_to_nan to true to convert nulls to nan
    ```
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    No.
    * GitHub Issue: #40061
    
    Lead-authored-by: AlenkaF <[email protected]>
    Co-authored-by: Alenka Frim <[email protected]>
    Co-authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Joris Van den Bossche <[email protected]>
---
 cpp/src/arrow/record_batch.cc        | 47 ++++++++++++++++------
 cpp/src/arrow/record_batch.h         |  6 ++-
 cpp/src/arrow/record_batch_test.cc   | 76 +++++++++++++++++++++++++++++++++++-
 python/pyarrow/includes/libarrow.pxd |  2 +-
 python/pyarrow/table.pxi             | 49 +++++++++++++++++++++--
 python/pyarrow/tests/test_table.py   | 48 ++++++++++++++++++++++-
 6 files changed, 208 insertions(+), 20 deletions(-)

diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
index 0d8bda9b66..6f3b8e75a2 100644
--- a/cpp/src/arrow/record_batch.cc
+++ b/cpp/src/arrow/record_batch.cc
@@ -18,6 +18,7 @@
 #include "arrow/record_batch.h"
 
 #include <algorithm>
+#include <cmath>
 #include <cstdlib>
 #include <memory>
 #include <sstream>
@@ -261,12 +262,19 @@ struct ConvertColumnsToTensorVisitor {
       using In = typename T::c_type;
       auto in_values = ArraySpan(in_data).GetSpan<In>(1, in_data.length);
 
-      if constexpr (std::is_same_v<In, Out>) {
-        memcpy(out_values, in_values.data(), in_values.size_bytes());
-        out_values += in_values.size();
+      if (in_data.null_count == 0) {
+        if constexpr (std::is_same_v<In, Out>) {
+          memcpy(out_values, in_values.data(), in_values.size_bytes());
+          out_values += in_values.size();
+        } else {
+          for (In in_value : in_values) {
+            *out_values++ = static_cast<Out>(in_value);
+          }
+        }
       } else {
-        for (In in_value : in_values) {
-          *out_values++ = static_cast<Out>(in_value);
+        for (int64_t i = 0; i < in_data.length; ++i) {
+          *out_values++ =
+              in_data.IsNull(i) ? static_cast<Out>(NAN) : 
static_cast<Out>(in_values[i]);
         }
       }
       return Status::OK();
@@ -286,16 +294,20 @@ inline void ConvertColumnsToTensor(const RecordBatch& 
batch, uint8_t* out) {
   }
 }
 
-Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
+Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan,
+                                                      MemoryPool* pool) const {
   if (num_columns() == 0) {
     return Status::TypeError(
         "Conversion to Tensor for RecordBatches without columns/schema is not "
         "supported.");
   }
   // Check for no validity bitmap of each field
+  // if null_to_nan conversion is set to false
   for (int i = 0; i < num_columns(); ++i) {
-    if (column(i)->null_count() > 0) {
-      return Status::TypeError("Can only convert a RecordBatch with no 
nulls.");
+    if (column(i)->null_count() > 0 && !null_to_nan) {
+      return Status::TypeError(
+          "Can only convert a RecordBatch with no nulls. Set null_to_nan to 
true to "
+          "convert nulls to NaN");
     }
   }
 
@@ -308,12 +320,12 @@ Result<std::shared_ptr<Tensor>> 
RecordBatch::ToTensor(MemoryPool* pool) const {
   std::shared_ptr<Field> result_field = schema_->field(0);
   std::shared_ptr<DataType> result_type = result_field->type();
 
-  if (num_columns() > 1) {
-    Field::MergeOptions options;
-    options.promote_integer_to_float = true;
-    options.promote_integer_sign = true;
-    options.promote_numeric_width = true;
+  Field::MergeOptions options;
+  options.promote_integer_to_float = true;
+  options.promote_integer_sign = true;
+  options.promote_numeric_width = true;
 
+  if (num_columns() > 1) {
     for (int i = 1; i < num_columns(); ++i) {
       if (!is_numeric(column(i)->type()->id())) {
         return Status::TypeError("DataType is not supported: ",
@@ -334,6 +346,15 @@ Result<std::shared_ptr<Tensor>> 
RecordBatch::ToTensor(MemoryPool* pool) const {
     result_type = result_field->type();
   }
 
+  // Check if result_type is signed or unsigned integer and null_to_nan is set 
to true
+  // Then all columns should be promoted to float type
+  if (is_integer(result_type->id()) && null_to_nan) {
+    ARROW_ASSIGN_OR_RAISE(
+        result_field,
+        result_field->MergeWith(field(result_field->name(), float32()), 
options));
+    result_type = result_field->type();
+  }
+
   // Allocate memory
   ARROW_ASSIGN_OR_RAISE(
       std::shared_ptr<Buffer> result,
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index 16d721caad..5202ff4abf 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -85,8 +85,12 @@ class ARROW_EXPORT RecordBatch {
   /// Create a Tensor object with shape (number of rows, number of columns) and
   /// strides (type size in bytes, type size in bytes * number of rows).
   /// Generated Tensor will have column-major layout.
+  ///
+  /// \param[in] null_to_nan if true, convert nulls to NaN
+  /// \param[in] pool the memory pool to allocate the tensor buffer
+  /// \return the resulting Tensor
   Result<std::shared_ptr<Tensor>> ToTensor(
-      MemoryPool* pool = default_memory_pool()) const;
+      bool null_to_nan = false, MemoryPool* pool = default_memory_pool()) 
const;
 
   /// \brief Construct record batch from struct array
   ///
diff --git a/cpp/src/arrow/record_batch_test.cc 
b/cpp/src/arrow/record_batch_test.cc
index 81154452d7..7e0eb1d460 100644
--- a/cpp/src/arrow/record_batch_test.cc
+++ b/cpp/src/arrow/record_batch_test.cc
@@ -667,7 +667,8 @@ TEST_F(TestRecordBatch, ToTensorUnsupportedMissing) {
   auto batch = RecordBatch::Make(schema, length, {a0, a1});
 
   ASSERT_RAISES_WITH_MESSAGE(TypeError,
-                             "Type error: Can only convert a RecordBatch with 
no nulls.",
+                             "Type error: Can only convert a RecordBatch with 
no nulls. "
+                             "Set null_to_nan to true to convert nulls to NaN",
                              batch->ToTensor());
 }
 
@@ -740,6 +741,79 @@ TEST_F(TestRecordBatch, ToTensorSupportedNaN) {
   CheckTensor<FloatType>(tensor, 18, shape, f_strides);
 }
 
+TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) {
+  const int length = 9;
+
+  // int32 + float32 = float64
+  auto f0 = field("f0", int32());
+  auto f1 = field("f1", float32());
+
+  std::vector<std::shared_ptr<Field>> fields = {f0, f1};
+  auto schema = ::arrow::schema(fields);
+
+  auto a0 = ArrayFromJSON(int32(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]");
+  auto a1 = ArrayFromJSON(float32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
+
+  auto batch = RecordBatch::Make(schema, length, {a0, a1});
+
+  ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor(/*null_to_nan=*/true));
+  ASSERT_OK(tensor->Validate());
+
+  std::vector<int64_t> shape = {9, 2};
+  const int64_t f64_size = sizeof(double);
+  std::vector<int64_t> f_strides = {f64_size, f64_size * shape[0]};
+  std::shared_ptr<Tensor> tensor_expected = TensorFromJSON(
+      float64(), "[NaN, 2,  3,  4,  5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60, 
70, 80, 90]",
+      shape, f_strides);
+
+  EXPECT_FALSE(tensor_expected->Equals(*tensor));
+  EXPECT_TRUE(tensor_expected->Equals(*tensor, 
EqualOptions().nans_equal(true)));
+
+  CheckTensor<DoubleType>(tensor, 18, shape, f_strides);
+
+  // int32 -> float64
+  auto f2 = field("f2", int32());
+
+  std::vector<std::shared_ptr<Field>> fields1 = {f0, f2};
+  auto schema1 = ::arrow::schema(fields1);
+
+  auto a2 = ArrayFromJSON(int32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
+  auto batch1 = RecordBatch::Make(schema1, length, {a0, a2});
+
+  ASSERT_OK_AND_ASSIGN(auto tensor1, batch1->ToTensor(/*null_to_nan=*/true));
+  ASSERT_OK(tensor1->Validate());
+
+  EXPECT_FALSE(tensor_expected->Equals(*tensor1));
+  EXPECT_TRUE(tensor_expected->Equals(*tensor1, 
EqualOptions().nans_equal(true)));
+
+  CheckTensor<DoubleType>(tensor1, 18, shape, f_strides);
+
+  // int8 -> float32
+  auto f3 = field("f3", int8());
+  auto f4 = field("f4", int8());
+
+  std::vector<std::shared_ptr<Field>> fields2 = {f3, f4};
+  auto schema2 = ::arrow::schema(fields2);
+
+  auto a3 = ArrayFromJSON(int8(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]");
+  auto a4 = ArrayFromJSON(int8(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
+  auto batch2 = RecordBatch::Make(schema2, length, {a3, a4});
+
+  ASSERT_OK_AND_ASSIGN(auto tensor2, batch2->ToTensor(/*null_to_nan=*/true));
+  ASSERT_OK(tensor2->Validate());
+
+  const int64_t f32_size = sizeof(float);
+  std::vector<int64_t> f_strides_2 = {f32_size, f32_size * shape[0]};
+  std::shared_ptr<Tensor> tensor_expected_2 = TensorFromJSON(
+      float32(), "[NaN, 2,  3,  4,  5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60, 
70, 80, 90]",
+      shape, f_strides_2);
+
+  EXPECT_FALSE(tensor_expected_2->Equals(*tensor2));
+  EXPECT_TRUE(tensor_expected_2->Equals(*tensor2, 
EqualOptions().nans_equal(true)));
+
+  CheckTensor<FloatType>(tensor2, 18, shape, f_strides_2);
+}
+
 TEST_F(TestRecordBatch, ToTensorSupportedTypesMixed) {
   const int length = 9;
 
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 9e5e3d3fa6..aa50dd189a 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -984,7 +984,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         shared_ptr[CRecordBatch] Slice(int64_t offset)
         shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)
 
-        CResult[shared_ptr[CTensor]] ToTensor() const
+        CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, CMemoryPool* 
pool) const
 
     cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
         shared_ptr[CRecordBatch] batch
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 1ab3fd04ed..54fda1da7d 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -3389,21 +3389,64 @@ cdef class RecordBatch(_Tabular):
                 
<CResult[shared_ptr[CArray]]>deref(c_record_batch).ToStructArray())
         return pyarrow_wrap_array(c_array)
 
-    def to_tensor(self):
+    def to_tensor(self, c_bool null_to_nan=False, MemoryPool memory_pool=None):
         """
         Convert to a :class:`~pyarrow.Tensor`.
 
         RecordBatches that can be converted have fields of type signed or 
unsigned
-        integer or float, including all bit-widths, with no validity bitmask.
+        integer or float, including all bit-widths. RecordBatches with 
validity bitmask
+        for any of the arrays can be converted with ``null_to_nan``turned to 
``True``.
+        In this case null values are converted to NaN and signed or unsigned 
integer
+        type arrays are promoted to appropriate float type.
+
+        Parameters
+        ----------
+        null_to_nan : bool, default False
+            Whether to write null values in the result as ``NaN``.
+        memory_pool : MemoryPool, default None
+            For memory allocations, if required, otherwise use default pool
+
+        Examples
+        --------
+        >>> import pyarrow as pa
+        >>> batch = pa.record_batch(
+        ...    [
+        ...         pa.array([1, 2, 3, 4, None], type=pa.int32()),
+        ...         pa.array([10, 20, 30, 40, None], type=pa.float32()),
+        ...     ], names = ["a", "b"]
+        ... )
+
+        >>> batch
+        pyarrow.RecordBatch
+        a: int32
+        b: float
+        ----
+        a: [1,2,3,4,null]
+        b: [10,20,30,40,null]
+
+        >>> batch.to_tensor(null_to_nan=True)
+        <pyarrow.Tensor>
+        type: double
+        shape: (5, 2)
+        strides: (8, 40)
+
+        >>> batch.to_tensor(null_to_nan=True).to_numpy()
+        array([[ 1., 10.],
+               [ 2., 20.],
+               [ 3., 30.],
+               [ 4., 40.],
+               [nan, nan]])
         """
         cdef:
             shared_ptr[CRecordBatch] c_record_batch
             shared_ptr[CTensor] c_tensor
+            CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
 
         c_record_batch = pyarrow_unwrap_batch(self)
         with nogil:
             c_tensor = GetResultValue(
-                <CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor())
+                
<CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor(null_to_nan,
+                                                                             
pool))
         return pyarrow_wrap_tensor(c_tensor)
 
     def _export_to_c(self, out_ptr, out_schema_ptr=0):
diff --git a/python/pyarrow/tests/test_table.py 
b/python/pyarrow/tests/test_table.py
index a7d917c2ba..8e30574188 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -1061,7 +1061,7 @@ def test_recordbatch_to_tensor_null():
     arr2 = [10, 20, 30, 40, 50, 60, 70, None, 90]
     batch = pa.RecordBatch.from_arrays(
         [
-            pa.array(arr1, type=pa.float32()),
+            pa.array(arr1, type=pa.int32()),
             pa.array(arr2, type=pa.float32()),
         ], ["a", "b"]
     )
@@ -1071,6 +1071,52 @@ def test_recordbatch_to_tensor_null():
     ):
         batch.to_tensor()
 
+    result = batch.to_tensor(null_to_nan=True)
+
+    x = np.array([arr1, arr2], np.float64).transpose()
+    expected = pa.Tensor.from_numpy(x)
+
+    np.testing.assert_equal(result.to_numpy(), x)
+    assert result.size == 18
+    assert result.type == pa.float64()
+    assert result.shape == expected.shape
+    assert result.strides == expected.strides
+
+    # int32 -> float64
+    batch = pa.RecordBatch.from_arrays(
+        [
+            pa.array(arr1, type=pa.int32()),
+            pa.array(arr2, type=pa.int32()),
+        ], ["a", "b"]
+    )
+
+    result = batch.to_tensor(null_to_nan=True)
+
+    np.testing.assert_equal(result.to_numpy(), x)
+    assert result.size == 18
+    assert result.type == pa.float64()
+    assert result.shape == expected.shape
+    assert result.strides == expected.strides
+
+    # int8 -> float32
+    batch = pa.RecordBatch.from_arrays(
+        [
+            pa.array(arr1, type=pa.int8()),
+            pa.array(arr2, type=pa.int8()),
+        ], ["a", "b"]
+    )
+
+    result = batch.to_tensor(null_to_nan=True)
+
+    x = np.array([arr1, arr2], np.float32).transpose()
+    expected = pa.Tensor.from_numpy(x)
+
+    np.testing.assert_equal(result.to_numpy(), x)
+    assert result.size == 18
+    assert result.type == pa.float32()
+    assert result.shape == expected.shape
+    assert result.strides == expected.strides
+
 
 def test_recordbatch_to_tensor_empty():
     batch = pa.RecordBatch.from_arrays(

Reply via email to