jorisvandenbossche commented on code in PR #14804: URL: https://github.com/apache/arrow/pull/14804#discussion_r1065438506
########## python/pyarrow/interchange/column.py: ########## @@ -0,0 +1,525 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import enum +from typing import ( + Any, + Dict, + Iterable, + Optional, + Tuple, +) + +import sys +if sys.version_info >= (3, 8): + from typing import TypedDict +else: + from typing_extensions import TypedDict + +import pyarrow as pa +import pyarrow.compute as pc +from pyarrow.interchange.buffer import _PyArrowBuffer + + +class DtypeKind(enum.IntEnum): + """ + Integer enum for data types. + + Attributes + ---------- + INT : int + Matches to signed integer data type. + UINT : int + Matches to unsigned integer data type. + FLOAT : int + Matches to floating point data type. + BOOL : int + Matches to boolean data type. + STRING : int + Matches to string data type (UTF-8 encoded). + DATETIME : int + Matches to datetime data type. + CATEGORICAL : int + Matches to categorical data type. + """ + + INT = 0 + UINT = 1 + FLOAT = 2 + BOOL = 20 + STRING = 21 # UTF-8 + DATETIME = 22 + CATEGORICAL = 23 + + +Dtype = Tuple[DtypeKind, int, str, str] # see Column.dtype + + +_PYARROW_KINDS = { + pa.int8(): (DtypeKind.INT, "c"), + pa.int16(): (DtypeKind.INT, "s"), + pa.int32(): (DtypeKind.INT, "i"), + pa.int64(): (DtypeKind.INT, "l"), + pa.uint8(): (DtypeKind.UINT, "C"), + pa.uint16(): (DtypeKind.UINT, "S"), + pa.uint32(): (DtypeKind.UINT, "I"), + pa.uint64(): (DtypeKind.UINT, "L"), + pa.float16(): (DtypeKind.FLOAT, "e"), + pa.float32(): (DtypeKind.FLOAT, "f"), + pa.float64(): (DtypeKind.FLOAT, "g"), + pa.bool_(): (DtypeKind.BOOL, "b"), + pa.string(): (DtypeKind.STRING, "u"), + pa.large_string(): (DtypeKind.STRING, "U"), +} + + +class ColumnNullType(enum.IntEnum): + """ + Integer enum for null type representation. + + Attributes + ---------- + NON_NULLABLE : int + Non-nullable column. + USE_NAN : int + Use explicit float NaN value. + USE_SENTINEL : int + Sentinel value besides NaN. + USE_BITMASK : int + The bit is set/unset representing a null on a certain position. + USE_BYTEMASK : int + The byte is set/unset representing a null on a certain position. + """ + + NON_NULLABLE = 0 + USE_NAN = 1 + USE_SENTINEL = 2 + USE_BITMASK = 3 + USE_BYTEMASK = 4 + + +class ColumnBuffers(TypedDict): + # first element is a buffer containing the column data; + # second element is the data buffer's associated dtype + data: Tuple[_PyArrowBuffer, Dtype] + + # first element is a buffer containing mask values indicating missing data; + # second element is the mask value buffer's associated dtype. + # None if the null representation is not a bit or byte mask + validity: Optional[Tuple[_PyArrowBuffer, Dtype]] + + # first element is a buffer containing the offset values for + # variable-size binary data (e.g., variable-length strings); + # second element is the offsets buffer's associated dtype. + # None if the data buffer does not have an associated offsets buffer + offsets: Optional[Tuple[_PyArrowBuffer, Dtype]] + + +class CategoricalDescription(TypedDict): + # whether the ordering of dictionary indices is semantically meaningful + is_ordered: bool + # whether a dictionary-style mapping of categorical values to other objects + # exists + is_dictionary: bool + # Python-level only (e.g. ``{int: str}``). + # None if not a dictionary-style categorical. + categories: Optional[_PyArrowColumn] + + +class Endianness: + """Enum indicating the byte-order of a data-type.""" + + LITTLE = "<" + BIG = ">" + NATIVE = "=" + NA = "|" + + +class NoBufferPresent(Exception): + """Exception to signal that there is no requested buffer.""" + + +class _PyArrowColumn: + """ + A column object, with only the methods and properties required by the + interchange protocol defined. + + A column can contain one or more chunks. Each chunk can contain up to three + buffers - a data buffer, a mask buffer (depending on null representation), + and an offsets buffer (if variable-size binary; e.g., variable-length + strings). + + TBD: Arrow has a separate "null" dtype, and has no separate mask concept. + Instead, it seems to use "children" for both columns with a bit mask, + and for nested dtypes. Unclear whether this is elegant or confusing. + This design requires checking the null representation explicitly. + + The Arrow design requires checking: + 1. the ARROW_FLAG_NULLABLE (for sentinel values) + 2. if a column has two children, combined with one of those children + having a null dtype. + + Making the mask concept explicit seems useful. One null dtype would + not be enough to cover both bit and byte masks, so that would mean + even more checking if we did it the Arrow way. + + TBD: there's also the "chunk" concept here, which is implicit in Arrow as + multiple buffers per array (= column here). Semantically it may make + sense to have both: chunks were meant for example for lazy evaluation + of data which doesn't fit in memory, while multiple buffers per column + could also come from doing a selection operation on a single + contiguous buffer. + + Given these concepts, one would expect chunks to be all of the same + size (say a 10,000 row dataframe could have 10 chunks of 1,000 rows), + while multiple buffers could have data-dependent lengths. Not an issue + in pandas if one column is backed by a single NumPy array, but in + Arrow it seems possible. + Are multiple chunks *and* multiple buffers per column necessary for + the purposes of this interchange protocol, or must producers either + reuse the chunk concept for this or copy the data? + + Note: this Column object can only be produced by ``__dataframe__``, so + doesn't need its own version or ``__column__`` protocol. + """ + + def __init__( + self, column: pa.Array | pa.ChunkedArray, allow_copy: bool = True + ) -> None: + """ + Handles PyArrow Arrays and ChunkedArrays. + """ + # Store the column as a private attribute + if isinstance(column, pa.ChunkedArray): + if column.num_chunks == 1: + column = column.chunk(0) + else: + if not allow_copy: + raise RuntimeError( + "Chunks will be combined and a copy is required which " + "is forbidden by allow_copy=False" + ) + column = column.combine_chunks() + + self._allow_copy = allow_copy + + if pa.types.is_boolean(column.type): + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted to uint8 and a copy " + "is required which is forbidden by allow_copy=False" + ) + self._dtype = self._dtype_from_arrowdtype(column.type, 8) + self._col = pc.cast(column, pa.uint8()) + else: + self._col = column + dtype = self._col.type + try: + bit_width = dtype.bit_width + except ValueError: # in case of a variable-length strings Review Comment: ```suggestion except ValueError: # in case of a variable-length strings, considered as array of bytes (8 bits) ``` (I always forget what the "8" means in this case, so a comment might help) ########## python/pyarrow/interchange/from_dataframe.py: ########## @@ -0,0 +1,540 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import ( + Any, +) + +from pyarrow.interchange.column import ( + DtypeKind, + ColumnBuffers, + ColumnNullType, +) + +import pyarrow as pa +import re + +import pyarrow.compute as pc +from pyarrow.interchange.column import Dtype + + +# A typing protocol could be added later to let Mypy validate code using +# `from_dataframe` better. +DataFrameObject = Any +ColumnObject = Any +BufferObject = Any + + +_PYARROW_DTYPES: dict[DtypeKind, dict[int, Any]] = { + DtypeKind.INT: {8: pa.int8(), + 16: pa.int16(), + 32: pa.int32(), + 64: pa.int64()}, + DtypeKind.UINT: {8: pa.uint8(), + 16: pa.uint16(), + 32: pa.uint32(), + 64: pa.uint64()}, + DtypeKind.FLOAT: {16: pa.float16(), + 32: pa.float32(), + 64: pa.float64()}, + DtypeKind.BOOL: {8: pa.uint8()}, + DtypeKind.STRING: {8: pa.string()}, +} + + +def from_dataframe(df: DataFrameObject, allow_copy=True) -> pa.Table: + """ + Build a ``pa.Table`` from any DataFrame supporting the interchange + protocol. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + if isinstance(df, pa.Table): + return df + + if not hasattr(df, "__dataframe__"): + raise ValueError("`df` does not support __dataframe__") + + return _from_dataframe(df.__dataframe__(allow_copy=allow_copy)) + + +def _from_dataframe(df: DataFrameObject, allow_copy=True): + """ + Build a ``pa.Table`` from the DataFrame interchange object. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + batches = [] + for chunk in df.get_chunks(): + batch = protocol_df_chunk_to_pyarrow(chunk, allow_copy) + batches.append(batch) + + table = pa.Table.from_batches(batches) + return table + + +def protocol_df_chunk_to_pyarrow( + df: DataFrameObject, + allow_copy: bool = True +) -> pa.Table: + """ + Convert interchange protocol chunk to ``pa.RecordBatch``. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.RecordBatch + """ + # We need a dict of columns here, with each column being a pa.Array + columns: dict[str, pa.Array] = {} + for name in df.column_names(): + if not isinstance(name, str): + raise ValueError(f"Column {name} is not a string") + if name in columns: + raise ValueError(f"Column {name} is not unique") + col = df.get_column_by_name(name) + dtype = col.dtype[0] + if dtype in ( + DtypeKind.INT, + DtypeKind.UINT, + DtypeKind.FLOAT, + DtypeKind.STRING, + DtypeKind.DATETIME, + ): + columns[name] = column_to_array(col) Review Comment: Should `allow_copy` be passed here as well? Also primitive arrays can result in a copy, I think, in case that the null type doesn't match pyarrow? ########## python/pyarrow/interchange/from_dataframe.py: ########## @@ -0,0 +1,540 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import ( + Any, +) + +from pyarrow.interchange.column import ( + DtypeKind, + ColumnBuffers, + ColumnNullType, +) + +import pyarrow as pa +import re + +import pyarrow.compute as pc +from pyarrow.interchange.column import Dtype + + +# A typing protocol could be added later to let Mypy validate code using +# `from_dataframe` better. +DataFrameObject = Any +ColumnObject = Any +BufferObject = Any + + +_PYARROW_DTYPES: dict[DtypeKind, dict[int, Any]] = { + DtypeKind.INT: {8: pa.int8(), + 16: pa.int16(), + 32: pa.int32(), + 64: pa.int64()}, + DtypeKind.UINT: {8: pa.uint8(), + 16: pa.uint16(), + 32: pa.uint32(), + 64: pa.uint64()}, + DtypeKind.FLOAT: {16: pa.float16(), + 32: pa.float32(), + 64: pa.float64()}, + DtypeKind.BOOL: {8: pa.uint8()}, + DtypeKind.STRING: {8: pa.string()}, +} + + +def from_dataframe(df: DataFrameObject, allow_copy=True) -> pa.Table: + """ + Build a ``pa.Table`` from any DataFrame supporting the interchange + protocol. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + if isinstance(df, pa.Table): + return df + + if not hasattr(df, "__dataframe__"): + raise ValueError("`df` does not support __dataframe__") + + return _from_dataframe(df.__dataframe__(allow_copy=allow_copy)) + + +def _from_dataframe(df: DataFrameObject, allow_copy=True): + """ + Build a ``pa.Table`` from the DataFrame interchange object. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + batches = [] + for chunk in df.get_chunks(): + batch = protocol_df_chunk_to_pyarrow(chunk, allow_copy) + batches.append(batch) + + table = pa.Table.from_batches(batches) + return table + + +def protocol_df_chunk_to_pyarrow( + df: DataFrameObject, + allow_copy: bool = True +) -> pa.Table: Review Comment: ```suggestion ) -> pa.RecordBatch: ``` ########## python/pyarrow/interchange/from_dataframe.py: ########## @@ -0,0 +1,540 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import ( + Any, +) + +from pyarrow.interchange.column import ( + DtypeKind, + ColumnBuffers, + ColumnNullType, +) + +import pyarrow as pa +import re + +import pyarrow.compute as pc +from pyarrow.interchange.column import Dtype + + +# A typing protocol could be added later to let Mypy validate code using +# `from_dataframe` better. +DataFrameObject = Any +ColumnObject = Any +BufferObject = Any + + +_PYARROW_DTYPES: dict[DtypeKind, dict[int, Any]] = { + DtypeKind.INT: {8: pa.int8(), + 16: pa.int16(), + 32: pa.int32(), + 64: pa.int64()}, + DtypeKind.UINT: {8: pa.uint8(), + 16: pa.uint16(), + 32: pa.uint32(), + 64: pa.uint64()}, + DtypeKind.FLOAT: {16: pa.float16(), + 32: pa.float32(), + 64: pa.float64()}, + DtypeKind.BOOL: {8: pa.uint8()}, + DtypeKind.STRING: {8: pa.string()}, +} + + +def from_dataframe(df: DataFrameObject, allow_copy=True) -> pa.Table: + """ + Build a ``pa.Table`` from any DataFrame supporting the interchange + protocol. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + if isinstance(df, pa.Table): + return df + + if not hasattr(df, "__dataframe__"): + raise ValueError("`df` does not support __dataframe__") + + return _from_dataframe(df.__dataframe__(allow_copy=allow_copy)) + + +def _from_dataframe(df: DataFrameObject, allow_copy=True): + """ + Build a ``pa.Table`` from the DataFrame interchange object. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + batches = [] + for chunk in df.get_chunks(): + batch = protocol_df_chunk_to_pyarrow(chunk, allow_copy) + batches.append(batch) + + table = pa.Table.from_batches(batches) + return table + + +def protocol_df_chunk_to_pyarrow( + df: DataFrameObject, + allow_copy: bool = True +) -> pa.Table: + """ + Convert interchange protocol chunk to ``pa.RecordBatch``. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.RecordBatch + """ + # We need a dict of columns here, with each column being a pa.Array + columns: dict[str, pa.Array] = {} + for name in df.column_names(): + if not isinstance(name, str): + raise ValueError(f"Column {name} is not a string") + if name in columns: + raise ValueError(f"Column {name} is not unique") + col = df.get_column_by_name(name) + dtype = col.dtype[0] + if dtype in ( + DtypeKind.INT, + DtypeKind.UINT, + DtypeKind.FLOAT, + DtypeKind.STRING, + DtypeKind.DATETIME, + ): + columns[name] = column_to_array(col) + elif dtype == DtypeKind.BOOL: + columns[name] = bool_column_to_array(col, allow_copy) + elif dtype == DtypeKind.CATEGORICAL: + columns[name] = categorical_column_to_dictionary(col, allow_copy) + else: + raise NotImplementedError(f"Data type {dtype} not handled yet") + + return pa.RecordBatch.from_pydict(columns) + + +def column_to_array( + col: ColumnObject, +) -> pa.Array: + """ + Convert a column holding one of the primitive dtypes to a PyArrow array. + A primitive type is one of: int, uint, float, bool (1 bit). + + Parameters + ---------- + col : ColumnObject + + Returns + ------- + pa.Array + """ + buffers = col.get_buffers() + null_count = col.null_count + data = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset, null_count) + return data + + +def bool_column_to_array( + col: ColumnObject, + allow_copy: bool = True, +) -> pa.Array: + """ + Convert a column holding boolean dtype to a PyArrow array. + + Parameters + ---------- + col : ColumnObject + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Array + """ + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted from uint8 and a copy " + "is required which is forbidden by allow_copy=False" + ) + + buffers = col.get_buffers() + data = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset) + data = pc.cast(data, pa.bool_()) + + return data + + +def categorical_column_to_dictionary( + col: ColumnObject, + allow_copy: bool = True, +) -> pa.DictionaryArray: + """ + Convert a column holding categorical data to a pa.DictionaryArray. + + Parameters + ---------- + col : ColumnObject + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.DictionaryArray + """ + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted from uint8 and a copy " + "is required which is forbidden by allow_copy=False" Review Comment: This error message is about boolean column, but should be about dictionary/categorical ########## python/pyarrow/interchange/dataframe.py: ########## @@ -0,0 +1,190 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations +from typing import ( + Any, + Iterable, + Optional, + Sequence, +) + +import pyarrow as pa + +from pyarrow.interchange.column import _PyArrowColumn + + +class _PyArrowDataFrame: + """ + A data frame class, with only the methods required by the interchange + protocol defined. + A "data frame" represents an ordered collection of named columns. + A column's "name" must be a unique string. + Columns may be accessed by name or by position. + This could be a public data frame class, or an object with the methods and + attributes defined on this DataFrame class could be returned from the + ``__dataframe__`` method of a public data frame class in a library adhering + to the dataframe interchange protocol specification. + """ + + def __init__( + self, df: pa.Table, nan_as_null: bool = False, allow_copy: bool = True + ) -> None: + """ + Constructor - an instance of this (private) class is returned from + `pa.Table.__dataframe__`. + """ + self._df = df + # ``nan_as_null`` is a keyword intended for the consumer to tell the + # producer to overwrite null values in the data with ``NaN`` (or + # ``NaT``). This currently has no effect; once support for nullable + # extension dtypes is added, this value should be propagated to + # columns. Review Comment: Yes, but the "once support for nullable extension dtypes is added, this value should be propagated to columns." part of the comment is pandas specific, and can be removed I think ########## python/pyarrow/interchange/from_dataframe.py: ########## @@ -0,0 +1,540 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import ( + Any, +) + +from pyarrow.interchange.column import ( + DtypeKind, + ColumnBuffers, + ColumnNullType, +) + +import pyarrow as pa +import re + +import pyarrow.compute as pc +from pyarrow.interchange.column import Dtype + + +# A typing protocol could be added later to let Mypy validate code using +# `from_dataframe` better. +DataFrameObject = Any +ColumnObject = Any +BufferObject = Any + + +_PYARROW_DTYPES: dict[DtypeKind, dict[int, Any]] = { + DtypeKind.INT: {8: pa.int8(), + 16: pa.int16(), + 32: pa.int32(), + 64: pa.int64()}, + DtypeKind.UINT: {8: pa.uint8(), + 16: pa.uint16(), + 32: pa.uint32(), + 64: pa.uint64()}, + DtypeKind.FLOAT: {16: pa.float16(), + 32: pa.float32(), + 64: pa.float64()}, + DtypeKind.BOOL: {8: pa.uint8()}, + DtypeKind.STRING: {8: pa.string()}, +} + + +def from_dataframe(df: DataFrameObject, allow_copy=True) -> pa.Table: + """ + Build a ``pa.Table`` from any DataFrame supporting the interchange + protocol. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + if isinstance(df, pa.Table): + return df + + if not hasattr(df, "__dataframe__"): + raise ValueError("`df` does not support __dataframe__") + + return _from_dataframe(df.__dataframe__(allow_copy=allow_copy)) + + +def _from_dataframe(df: DataFrameObject, allow_copy=True): + """ + Build a ``pa.Table`` from the DataFrame interchange object. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + batches = [] + for chunk in df.get_chunks(): + batch = protocol_df_chunk_to_pyarrow(chunk, allow_copy) + batches.append(batch) + + table = pa.Table.from_batches(batches) + return table + + +def protocol_df_chunk_to_pyarrow( + df: DataFrameObject, + allow_copy: bool = True +) -> pa.Table: + """ + Convert interchange protocol chunk to ``pa.RecordBatch``. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.RecordBatch + """ + # We need a dict of columns here, with each column being a pa.Array + columns: dict[str, pa.Array] = {} + for name in df.column_names(): + if not isinstance(name, str): + raise ValueError(f"Column {name} is not a string") + if name in columns: + raise ValueError(f"Column {name} is not unique") + col = df.get_column_by_name(name) + dtype = col.dtype[0] + if dtype in ( + DtypeKind.INT, + DtypeKind.UINT, + DtypeKind.FLOAT, + DtypeKind.STRING, + DtypeKind.DATETIME, + ): + columns[name] = column_to_array(col) + elif dtype == DtypeKind.BOOL: + columns[name] = bool_column_to_array(col, allow_copy) + elif dtype == DtypeKind.CATEGORICAL: + columns[name] = categorical_column_to_dictionary(col, allow_copy) + else: + raise NotImplementedError(f"Data type {dtype} not handled yet") + + return pa.RecordBatch.from_pydict(columns) + + +def column_to_array( + col: ColumnObject, +) -> pa.Array: + """ + Convert a column holding one of the primitive dtypes to a PyArrow array. + A primitive type is one of: int, uint, float, bool (1 bit). + + Parameters + ---------- + col : ColumnObject + + Returns + ------- + pa.Array + """ + buffers = col.get_buffers() + null_count = col.null_count + data = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset, null_count) + return data + + +def bool_column_to_array( + col: ColumnObject, + allow_copy: bool = True, +) -> pa.Array: + """ + Convert a column holding boolean dtype to a PyArrow array. + + Parameters + ---------- + col : ColumnObject + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Array + """ + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted from uint8 and a copy " + "is required which is forbidden by allow_copy=False" + ) + + buffers = col.get_buffers() + data = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset) + data = pc.cast(data, pa.bool_()) + + return data + + +def categorical_column_to_dictionary( + col: ColumnObject, + allow_copy: bool = True, +) -> pa.DictionaryArray: + """ + Convert a column holding categorical data to a pa.DictionaryArray. + + Parameters + ---------- + col : ColumnObject + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.DictionaryArray + """ + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted from uint8 and a copy " + "is required which is forbidden by allow_copy=False" + ) + + categorical = col.describe_categorical + + if not categorical["is_dictionary"]: + raise NotImplementedError( + "Non-dictionary categoricals not supported yet") + + cat_column = categorical["categories"] + dictionary = column_to_array(cat_column) + + buffers = col.get_buffers() + indices = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset) + + # Constructing a pa.DictionaryArray + dict_array = pa.DictionaryArray.from_arrays(indices, dictionary) + + return dict_array + + +def parse_datetime_format_str(format_str): + """Parse datetime `format_str` to interpret the `data`.""" + + # timestamp 'ts{unit}:tz' + timestamp_meta = re.match(r"ts([smun]):(.*)", format_str) + if timestamp_meta: + unit, tz = timestamp_meta.group(1), timestamp_meta.group(2) + if unit != "s": + # the format string describes only a first letter of the unit, so + # add one extra letter to convert the unit to numpy-style: + # 'm' -> 'ms', 'u' -> 'us', 'n' -> 'ns' + unit += "s" + + return unit, tz + + raise NotImplementedError(f"DateTime kind is not supported: {format_str}") + + +def map_date_type(data_type): + """Map column date type to pyarrow date type. """ + kind, bit_width, f_string, _ = data_type + + if kind == DtypeKind.DATETIME: + unit, tz = parse_datetime_format_str(f_string) + return pa.timestamp(unit, tz=tz) + else: + pa_dtype = _PYARROW_DTYPES.get(kind, {}).get(bit_width, None) + + # Error if dtype is not supported + if pa_dtype: + return pa_dtype + else: + raise NotImplementedError( + f"Conversion for {data_type} is not yet supported.") + + +def buffers_to_array( + buffers: ColumnBuffers, + length: int, + describe_null: ColumnNullType, + offset: int = 0, + null_count: int = -1 +) -> pa.Array: + """ + Build a PyArrow array from the passed buffer. + + Parameters + ---------- + buffer : ColumnBuffers + Dictionary containing tuples of underlying buffers and + their associated dtype. + length : int + The number of values in the array. + describe_null: ColumnNullType + Null representation the column dtype uses, + as a tuple ``(kind, value)`` + offset : int, default: 0 + Number of elements to offset from the start of the buffer. + null_count : int, default: -1 + + Returns + ------- + pa.Array + + Notes + ----- + The returned array doesn't own the memory. The caller of this function + is responsible for keeping the memory owner object alive as long as + the returned PyArrow array is being used. + """ + data_buff, data_type = buffers["data"] + try: + validity_buff, validity_dtype = buffers["validity"] + except TypeError: + validity_buff = None + try: + offset_buff, offset_dtype = buffers["offsets"] + except TypeError: + offset_buff = None + + # Construct a pyarrow Buffer + data_pa_buffer = pa.foreign_buffer(data_buff.ptr, data_buff.bufsize, + base=data_buff) + + # Construct a validity pyarrow Buffer, if applicable + if validity_buff: + validity_pa_buff = validity_buffer_from_mask(validity_buff, + validity_dtype, + describe_null, + length, + offset) + else: + validity_pa_buff = validity_buffer_nan_sentinel(data_pa_buffer, + data_type, + describe_null, + length, + offset) + + # Construct a pyarrow Array from buffers + data_dtype = map_date_type(data_type) + + if offset_buff: + _, offset_bit_width, _, _ = offset_dtype + # If an offset buffer exists, construct an offset pyarrow Buffer + # and add it to the construction of an array + offset_pa_buffer = pa.foreign_buffer(offset_buff.ptr, + offset_buff.bufsize, + base=offset_buff) + + if data_type[2] == 'U': + if not null_count: + null_count = -1 + array = pa.LargeStringArray.from_buffers( + length, + offset_pa_buffer, + data_pa_buffer, + validity_pa_buff, + null_count=null_count, + offset=offset, + ) + else: + if offset_bit_width == 64: + string_type = pa.large_string() + else: + string_type = pa.string() + array = pa.Array.from_buffers( + string_type, + length, + [validity_pa_buff, offset_pa_buffer, data_pa_buffer], + offset=offset, + ) + else: + array = pa.Array.from_buffers( + data_dtype, + length, + [validity_pa_buff, data_pa_buffer], + offset=offset, + ) + + return array + + +def validity_buffer_from_mask( + validity_buff: BufferObject, + validity_dtype: Dtype, + describe_null: ColumnNullType, + length: int, + offset: int = 0, +) -> pa.Buffer: + """ + Build a PyArrow buffer from the passed mask buffer. + + Parameters + ---------- + validity_buff : BufferObject + Tuple of underlying validity buffer and associated dtype. + validity_dtype : Dtype + Dtype description as a tuple ``(kind, bit-width, format string, + endianness)``. + describe_null : ColumnNullType + Null representation the column dtype uses, + as a tuple ``(kind, value)`` + length : int + The number of values in the array. + offset : int, default: 0 + Number of elements to offset from the start of the buffer. + + Returns + ------- + pa.Buffer + """ + null_kind, sentinel_val = describe_null + validity_kind, _, _, _ = validity_dtype + assert validity_kind == DtypeKind.BOOL + + if null_kind == ColumnNullType.NON_NULLABLE: + # Sliced array can have a NON_NULLABLE ColumnNullType due + # to no missing values in that slice of an array though the bitmask + # exists and validity_buff must be set to None in this case + return None + + elif null_kind == ColumnNullType.USE_BYTEMASK or ( + null_kind == ColumnNullType.USE_BITMASK and sentinel_val == 1 + ): + buff = pa.foreign_buffer(validity_buff.ptr, + validity_buff.bufsize, + base=validity_buff) + + if null_kind == ColumnNullType.USE_BYTEMASK: + mask = pa.Array.from_buffers(pa.int8(), length, + [None, buff], + offset=offset) + mask_bool = pc.cast(mask, pa.bool_()) + else: + mask_bool = pa.Array.from_buffers(pa.bool_(), length, + [None, buff], + offset=offset) + + if sentinel_val == 1: + mask_bool = pc.invert(mask_bool) + + return mask_bool.buffers()[1] + + elif null_kind == ColumnNullType.USE_BITMASK and sentinel_val == 0: + return pa.foreign_buffer(validity_buff.ptr, + validity_buff.bufsize, + base=validity_buff) + else: + raise NotImplementedError( + f"{describe_null} null representation is not yet supported.") + + +def validity_buffer_nan_sentinel( + data_pa_buffer: BufferObject, + data_type: Dtype, + describe_null: ColumnNullType, + length: int, + offset: int = 0, +) -> pa.Buffer: + """ + Build a PyArrow buffer from NaN or sentinel values. + + Parameters + ---------- + data_pa_buffer : pa.Buffer + PyArrow buffer for the column data. + data_type : Dtype + Dtype description as a tuple ``(kind, bit-width, format string, + endianness)``. + describe_null : ColumnNullType + Null representation the column dtype uses, + as a tuple ``(kind, value)`` + length : int + The number of values in the array. + offset : int, default: 0 + Number of elements to offset from the start of the buffer. + + Returns + ------- + pa.Buffer + """ + kind, bit_width, _, _ = data_type + data_dtype = map_date_type(data_type) + null_kind, sentinel_val = describe_null + + # Check for float NaN values + if null_kind == ColumnNullType.USE_NAN: + if kind == DtypeKind.FLOAT and bit_width == 16: + raise NotImplementedError( + f"{data_type} with {null_kind} is not yet supported.") Review Comment: What was the reason again that this doesn't work? (maybe add a comment about it) ########## python/pyarrow/interchange/from_dataframe.py: ########## @@ -0,0 +1,540 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import ( + Any, +) + +from pyarrow.interchange.column import ( + DtypeKind, + ColumnBuffers, + ColumnNullType, +) + +import pyarrow as pa +import re + +import pyarrow.compute as pc +from pyarrow.interchange.column import Dtype + + +# A typing protocol could be added later to let Mypy validate code using +# `from_dataframe` better. +DataFrameObject = Any +ColumnObject = Any +BufferObject = Any + + +_PYARROW_DTYPES: dict[DtypeKind, dict[int, Any]] = { + DtypeKind.INT: {8: pa.int8(), + 16: pa.int16(), + 32: pa.int32(), + 64: pa.int64()}, + DtypeKind.UINT: {8: pa.uint8(), + 16: pa.uint16(), + 32: pa.uint32(), + 64: pa.uint64()}, + DtypeKind.FLOAT: {16: pa.float16(), + 32: pa.float32(), + 64: pa.float64()}, + DtypeKind.BOOL: {8: pa.uint8()}, + DtypeKind.STRING: {8: pa.string()}, +} + + +def from_dataframe(df: DataFrameObject, allow_copy=True) -> pa.Table: + """ + Build a ``pa.Table`` from any DataFrame supporting the interchange + protocol. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + if isinstance(df, pa.Table): + return df + + if not hasattr(df, "__dataframe__"): + raise ValueError("`df` does not support __dataframe__") + + return _from_dataframe(df.__dataframe__(allow_copy=allow_copy)) + + +def _from_dataframe(df: DataFrameObject, allow_copy=True): + """ + Build a ``pa.Table`` from the DataFrame interchange object. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + batches = [] + for chunk in df.get_chunks(): + batch = protocol_df_chunk_to_pyarrow(chunk, allow_copy) + batches.append(batch) + + table = pa.Table.from_batches(batches) + return table + + +def protocol_df_chunk_to_pyarrow( + df: DataFrameObject, + allow_copy: bool = True +) -> pa.Table: + """ + Convert interchange protocol chunk to ``pa.RecordBatch``. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.RecordBatch + """ + # We need a dict of columns here, with each column being a pa.Array + columns: dict[str, pa.Array] = {} + for name in df.column_names(): + if not isinstance(name, str): + raise ValueError(f"Column {name} is not a string") + if name in columns: + raise ValueError(f"Column {name} is not unique") + col = df.get_column_by_name(name) + dtype = col.dtype[0] + if dtype in ( + DtypeKind.INT, + DtypeKind.UINT, + DtypeKind.FLOAT, + DtypeKind.STRING, + DtypeKind.DATETIME, + ): + columns[name] = column_to_array(col) + elif dtype == DtypeKind.BOOL: + columns[name] = bool_column_to_array(col, allow_copy) + elif dtype == DtypeKind.CATEGORICAL: + columns[name] = categorical_column_to_dictionary(col, allow_copy) + else: + raise NotImplementedError(f"Data type {dtype} not handled yet") + + return pa.RecordBatch.from_pydict(columns) + + +def column_to_array( + col: ColumnObject, +) -> pa.Array: + """ + Convert a column holding one of the primitive dtypes to a PyArrow array. + A primitive type is one of: int, uint, float, bool (1 bit). + + Parameters + ---------- + col : ColumnObject + + Returns + ------- + pa.Array + """ + buffers = col.get_buffers() + null_count = col.null_count + data = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset, null_count) + return data + + +def bool_column_to_array( + col: ColumnObject, + allow_copy: bool = True, +) -> pa.Array: + """ + Convert a column holding boolean dtype to a PyArrow array. + + Parameters + ---------- + col : ColumnObject + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Array + """ + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted from uint8 and a copy " + "is required which is forbidden by allow_copy=False" + ) + + buffers = col.get_buffers() + data = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset) + data = pc.cast(data, pa.bool_()) + + return data + + +def categorical_column_to_dictionary( + col: ColumnObject, + allow_copy: bool = True, +) -> pa.DictionaryArray: + """ + Convert a column holding categorical data to a pa.DictionaryArray. + + Parameters + ---------- + col : ColumnObject + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.DictionaryArray + """ + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted from uint8 and a copy " + "is required which is forbidden by allow_copy=False" + ) + + categorical = col.describe_categorical + + if not categorical["is_dictionary"]: + raise NotImplementedError( + "Non-dictionary categoricals not supported yet") + + cat_column = categorical["categories"] + dictionary = column_to_array(cat_column) + + buffers = col.get_buffers() + indices = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset) + + # Constructing a pa.DictionaryArray + dict_array = pa.DictionaryArray.from_arrays(indices, dictionary) + + return dict_array + + +def parse_datetime_format_str(format_str): + """Parse datetime `format_str` to interpret the `data`.""" + + # timestamp 'ts{unit}:tz' + timestamp_meta = re.match(r"ts([smun]):(.*)", format_str) + if timestamp_meta: + unit, tz = timestamp_meta.group(1), timestamp_meta.group(2) + if unit != "s": + # the format string describes only a first letter of the unit, so + # add one extra letter to convert the unit to numpy-style: + # 'm' -> 'ms', 'u' -> 'us', 'n' -> 'ns' + unit += "s" + + return unit, tz + + raise NotImplementedError(f"DateTime kind is not supported: {format_str}") + + +def map_date_type(data_type): + """Map column date type to pyarrow date type. """ + kind, bit_width, f_string, _ = data_type + + if kind == DtypeKind.DATETIME: + unit, tz = parse_datetime_format_str(f_string) + return pa.timestamp(unit, tz=tz) + else: + pa_dtype = _PYARROW_DTYPES.get(kind, {}).get(bit_width, None) + + # Error if dtype is not supported + if pa_dtype: + return pa_dtype + else: + raise NotImplementedError( + f"Conversion for {data_type} is not yet supported.") + + +def buffers_to_array( + buffers: ColumnBuffers, + length: int, + describe_null: ColumnNullType, + offset: int = 0, + null_count: int = -1 +) -> pa.Array: + """ + Build a PyArrow array from the passed buffer. + + Parameters + ---------- + buffer : ColumnBuffers + Dictionary containing tuples of underlying buffers and + their associated dtype. + length : int + The number of values in the array. + describe_null: ColumnNullType + Null representation the column dtype uses, + as a tuple ``(kind, value)`` + offset : int, default: 0 + Number of elements to offset from the start of the buffer. + null_count : int, default: -1 + + Returns + ------- + pa.Array + + Notes + ----- + The returned array doesn't own the memory. The caller of this function + is responsible for keeping the memory owner object alive as long as + the returned PyArrow array is being used. + """ + data_buff, data_type = buffers["data"] + try: + validity_buff, validity_dtype = buffers["validity"] + except TypeError: + validity_buff = None + try: + offset_buff, offset_dtype = buffers["offsets"] + except TypeError: + offset_buff = None + + # Construct a pyarrow Buffer + data_pa_buffer = pa.foreign_buffer(data_buff.ptr, data_buff.bufsize, + base=data_buff) + + # Construct a validity pyarrow Buffer, if applicable + if validity_buff: + validity_pa_buff = validity_buffer_from_mask(validity_buff, + validity_dtype, + describe_null, + length, + offset) + else: + validity_pa_buff = validity_buffer_nan_sentinel(data_pa_buffer, + data_type, + describe_null, + length, + offset) + + # Construct a pyarrow Array from buffers + data_dtype = map_date_type(data_type) + + if offset_buff: + _, offset_bit_width, _, _ = offset_dtype + # If an offset buffer exists, construct an offset pyarrow Buffer + # and add it to the construction of an array + offset_pa_buffer = pa.foreign_buffer(offset_buff.ptr, + offset_buff.bufsize, + base=offset_buff) + + if data_type[2] == 'U': + if not null_count: + null_count = -1 + array = pa.LargeStringArray.from_buffers( + length, + offset_pa_buffer, + data_pa_buffer, + validity_pa_buff, + null_count=null_count, + offset=offset, + ) + else: + if offset_bit_width == 64: + string_type = pa.large_string() + else: + string_type = pa.string() + array = pa.Array.from_buffers( + string_type, + length, + [validity_pa_buff, offset_pa_buffer, data_pa_buffer], + offset=offset, + ) + else: + array = pa.Array.from_buffers( + data_dtype, + length, + [validity_pa_buff, data_pa_buffer], + offset=offset, + ) + + return array + + +def validity_buffer_from_mask( + validity_buff: BufferObject, + validity_dtype: Dtype, + describe_null: ColumnNullType, + length: int, + offset: int = 0, +) -> pa.Buffer: + """ + Build a PyArrow buffer from the passed mask buffer. + + Parameters + ---------- + validity_buff : BufferObject + Tuple of underlying validity buffer and associated dtype. + validity_dtype : Dtype + Dtype description as a tuple ``(kind, bit-width, format string, + endianness)``. + describe_null : ColumnNullType + Null representation the column dtype uses, + as a tuple ``(kind, value)`` + length : int + The number of values in the array. + offset : int, default: 0 + Number of elements to offset from the start of the buffer. + + Returns + ------- + pa.Buffer + """ + null_kind, sentinel_val = describe_null + validity_kind, _, _, _ = validity_dtype + assert validity_kind == DtypeKind.BOOL + + if null_kind == ColumnNullType.NON_NULLABLE: + # Sliced array can have a NON_NULLABLE ColumnNullType due + # to no missing values in that slice of an array though the bitmask + # exists and validity_buff must be set to None in this case + return None + + elif null_kind == ColumnNullType.USE_BYTEMASK or ( + null_kind == ColumnNullType.USE_BITMASK and sentinel_val == 1 + ): + buff = pa.foreign_buffer(validity_buff.ptr, + validity_buff.bufsize, + base=validity_buff) + + if null_kind == ColumnNullType.USE_BYTEMASK: + mask = pa.Array.from_buffers(pa.int8(), length, + [None, buff], + offset=offset) + mask_bool = pc.cast(mask, pa.bool_()) + else: + mask_bool = pa.Array.from_buffers(pa.bool_(), length, + [None, buff], + offset=offset) + + if sentinel_val == 1: + mask_bool = pc.invert(mask_bool) + + return mask_bool.buffers()[1] + + elif null_kind == ColumnNullType.USE_BITMASK and sentinel_val == 0: + return pa.foreign_buffer(validity_buff.ptr, + validity_buff.bufsize, + base=validity_buff) + else: + raise NotImplementedError( + f"{describe_null} null representation is not yet supported.") + + +def validity_buffer_nan_sentinel( + data_pa_buffer: BufferObject, + data_type: Dtype, + describe_null: ColumnNullType, + length: int, + offset: int = 0, +) -> pa.Buffer: + """ + Build a PyArrow buffer from NaN or sentinel values. + + Parameters + ---------- + data_pa_buffer : pa.Buffer + PyArrow buffer for the column data. + data_type : Dtype + Dtype description as a tuple ``(kind, bit-width, format string, + endianness)``. + describe_null : ColumnNullType + Null representation the column dtype uses, + as a tuple ``(kind, value)`` + length : int + The number of values in the array. + offset : int, default: 0 + Number of elements to offset from the start of the buffer. + + Returns + ------- + pa.Buffer + """ + kind, bit_width, _, _ = data_type + data_dtype = map_date_type(data_type) + null_kind, sentinel_val = describe_null + + # Check for float NaN values + if null_kind == ColumnNullType.USE_NAN: + if kind == DtypeKind.FLOAT and bit_width == 16: + raise NotImplementedError( + f"{data_type} with {null_kind} is not yet supported.") + else: + pyarrow_data = pa.Array.from_buffers( + data_dtype, + length, + [None, data_pa_buffer], + offset=offset, + ) + mask = pc.is_nan(pyarrow_data) + mask = pc.invert(mask) + return mask.buffers()[1] + + # Check for sentinel values + elif null_kind == ColumnNullType.USE_SENTINEL: + if kind == DtypeKind.DATETIME: + sentinel_dtype = pa.int64() + else: + sentinel_dtype = data_dtype + int_arr = pa.Array.from_buffers(sentinel_dtype, + length, + [None, data_pa_buffer], + offset=offset) + sentinel_arr = pc.equal(int_arr, sentinel_val) Review Comment: ```suggestion pyarrow_data = pa.Array.from_buffers(sentinel_dtype, length, [None, data_pa_buffer], offset=offset) sentinel_arr = pc.equal(pyarrow_data, sentinel_val) ``` (it's not necessarily an int array?) ########## python/pyarrow/interchange/from_dataframe.py: ########## @@ -0,0 +1,540 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import ( + Any, +) + +from pyarrow.interchange.column import ( + DtypeKind, + ColumnBuffers, + ColumnNullType, +) + +import pyarrow as pa +import re + +import pyarrow.compute as pc +from pyarrow.interchange.column import Dtype + + +# A typing protocol could be added later to let Mypy validate code using +# `from_dataframe` better. +DataFrameObject = Any +ColumnObject = Any +BufferObject = Any + + +_PYARROW_DTYPES: dict[DtypeKind, dict[int, Any]] = { + DtypeKind.INT: {8: pa.int8(), + 16: pa.int16(), + 32: pa.int32(), + 64: pa.int64()}, + DtypeKind.UINT: {8: pa.uint8(), + 16: pa.uint16(), + 32: pa.uint32(), + 64: pa.uint64()}, + DtypeKind.FLOAT: {16: pa.float16(), + 32: pa.float32(), + 64: pa.float64()}, + DtypeKind.BOOL: {8: pa.uint8()}, + DtypeKind.STRING: {8: pa.string()}, +} + + +def from_dataframe(df: DataFrameObject, allow_copy=True) -> pa.Table: + """ + Build a ``pa.Table`` from any DataFrame supporting the interchange + protocol. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + if isinstance(df, pa.Table): + return df + + if not hasattr(df, "__dataframe__"): + raise ValueError("`df` does not support __dataframe__") + + return _from_dataframe(df.__dataframe__(allow_copy=allow_copy)) + + +def _from_dataframe(df: DataFrameObject, allow_copy=True): + """ + Build a ``pa.Table`` from the DataFrame interchange object. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + batches = [] + for chunk in df.get_chunks(): + batch = protocol_df_chunk_to_pyarrow(chunk, allow_copy) + batches.append(batch) + + table = pa.Table.from_batches(batches) + return table + + +def protocol_df_chunk_to_pyarrow( + df: DataFrameObject, + allow_copy: bool = True +) -> pa.Table: + """ + Convert interchange protocol chunk to ``pa.RecordBatch``. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.RecordBatch + """ + # We need a dict of columns here, with each column being a pa.Array + columns: dict[str, pa.Array] = {} + for name in df.column_names(): + if not isinstance(name, str): + raise ValueError(f"Column {name} is not a string") + if name in columns: + raise ValueError(f"Column {name} is not unique") + col = df.get_column_by_name(name) + dtype = col.dtype[0] + if dtype in ( + DtypeKind.INT, + DtypeKind.UINT, + DtypeKind.FLOAT, + DtypeKind.STRING, + DtypeKind.DATETIME, + ): + columns[name] = column_to_array(col) + elif dtype == DtypeKind.BOOL: + columns[name] = bool_column_to_array(col, allow_copy) + elif dtype == DtypeKind.CATEGORICAL: + columns[name] = categorical_column_to_dictionary(col, allow_copy) + else: + raise NotImplementedError(f"Data type {dtype} not handled yet") + + return pa.RecordBatch.from_pydict(columns) + + +def column_to_array( + col: ColumnObject, +) -> pa.Array: + """ + Convert a column holding one of the primitive dtypes to a PyArrow array. + A primitive type is one of: int, uint, float, bool (1 bit). + + Parameters + ---------- + col : ColumnObject + + Returns + ------- + pa.Array + """ + buffers = col.get_buffers() + null_count = col.null_count + data = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset, null_count) + return data + + +def bool_column_to_array( + col: ColumnObject, + allow_copy: bool = True, +) -> pa.Array: + """ + Convert a column holding boolean dtype to a PyArrow array. + + Parameters + ---------- + col : ColumnObject + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Array + """ + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted from uint8 and a copy " + "is required which is forbidden by allow_copy=False" + ) + + buffers = col.get_buffers() + data = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset) + data = pc.cast(data, pa.bool_()) + + return data + + +def categorical_column_to_dictionary( + col: ColumnObject, + allow_copy: bool = True, +) -> pa.DictionaryArray: + """ + Convert a column holding categorical data to a pa.DictionaryArray. + + Parameters + ---------- + col : ColumnObject + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.DictionaryArray + """ + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted from uint8 and a copy " + "is required which is forbidden by allow_copy=False" + ) + + categorical = col.describe_categorical + + if not categorical["is_dictionary"]: + raise NotImplementedError( + "Non-dictionary categoricals not supported yet") + + cat_column = categorical["categories"] + dictionary = column_to_array(cat_column) + + buffers = col.get_buffers() + indices = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset) + + # Constructing a pa.DictionaryArray + dict_array = pa.DictionaryArray.from_arrays(indices, dictionary) + + return dict_array + + +def parse_datetime_format_str(format_str): + """Parse datetime `format_str` to interpret the `data`.""" + + # timestamp 'ts{unit}:tz' + timestamp_meta = re.match(r"ts([smun]):(.*)", format_str) + if timestamp_meta: + unit, tz = timestamp_meta.group(1), timestamp_meta.group(2) + if unit != "s": + # the format string describes only a first letter of the unit, so + # add one extra letter to convert the unit to numpy-style: + # 'm' -> 'ms', 'u' -> 'us', 'n' -> 'ns' + unit += "s" + + return unit, tz + + raise NotImplementedError(f"DateTime kind is not supported: {format_str}") + + +def map_date_type(data_type): + """Map column date type to pyarrow date type. """ + kind, bit_width, f_string, _ = data_type + + if kind == DtypeKind.DATETIME: + unit, tz = parse_datetime_format_str(f_string) + return pa.timestamp(unit, tz=tz) + else: + pa_dtype = _PYARROW_DTYPES.get(kind, {}).get(bit_width, None) + + # Error if dtype is not supported + if pa_dtype: + return pa_dtype + else: + raise NotImplementedError( + f"Conversion for {data_type} is not yet supported.") + + +def buffers_to_array( + buffers: ColumnBuffers, + length: int, + describe_null: ColumnNullType, + offset: int = 0, + null_count: int = -1 +) -> pa.Array: + """ + Build a PyArrow array from the passed buffer. + + Parameters + ---------- + buffer : ColumnBuffers + Dictionary containing tuples of underlying buffers and + their associated dtype. + length : int + The number of values in the array. + describe_null: ColumnNullType + Null representation the column dtype uses, + as a tuple ``(kind, value)`` + offset : int, default: 0 + Number of elements to offset from the start of the buffer. + null_count : int, default: -1 + + Returns + ------- + pa.Array + + Notes + ----- + The returned array doesn't own the memory. The caller of this function + is responsible for keeping the memory owner object alive as long as + the returned PyArrow array is being used. + """ + data_buff, data_type = buffers["data"] + try: + validity_buff, validity_dtype = buffers["validity"] + except TypeError: + validity_buff = None + try: + offset_buff, offset_dtype = buffers["offsets"] + except TypeError: + offset_buff = None + + # Construct a pyarrow Buffer + data_pa_buffer = pa.foreign_buffer(data_buff.ptr, data_buff.bufsize, + base=data_buff) + + # Construct a validity pyarrow Buffer, if applicable + if validity_buff: + validity_pa_buff = validity_buffer_from_mask(validity_buff, + validity_dtype, + describe_null, + length, + offset) + else: + validity_pa_buff = validity_buffer_nan_sentinel(data_pa_buffer, + data_type, + describe_null, + length, + offset) + + # Construct a pyarrow Array from buffers + data_dtype = map_date_type(data_type) + + if offset_buff: + _, offset_bit_width, _, _ = offset_dtype + # If an offset buffer exists, construct an offset pyarrow Buffer + # and add it to the construction of an array + offset_pa_buffer = pa.foreign_buffer(offset_buff.ptr, + offset_buff.bufsize, + base=offset_buff) + + if data_type[2] == 'U': + if not null_count: + null_count = -1 + array = pa.LargeStringArray.from_buffers( + length, + offset_pa_buffer, + data_pa_buffer, + validity_pa_buff, + null_count=null_count, + offset=offset, + ) + else: + if offset_bit_width == 64: + string_type = pa.large_string() + else: + string_type = pa.string() + array = pa.Array.from_buffers( + string_type, + length, + [validity_pa_buff, offset_pa_buffer, data_pa_buffer], + offset=offset, + ) Review Comment: Can we combine the if/else here? (and do a single pa.Array.from_buffers) I think you could do something like `if data_type[2] == "U": string_type = pa.large_string()` ########## python/pyarrow/interchange/from_dataframe.py: ########## @@ -0,0 +1,540 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import ( + Any, +) + +from pyarrow.interchange.column import ( + DtypeKind, + ColumnBuffers, + ColumnNullType, +) + +import pyarrow as pa +import re + +import pyarrow.compute as pc +from pyarrow.interchange.column import Dtype + + +# A typing protocol could be added later to let Mypy validate code using +# `from_dataframe` better. +DataFrameObject = Any +ColumnObject = Any +BufferObject = Any + + +_PYARROW_DTYPES: dict[DtypeKind, dict[int, Any]] = { + DtypeKind.INT: {8: pa.int8(), + 16: pa.int16(), + 32: pa.int32(), + 64: pa.int64()}, + DtypeKind.UINT: {8: pa.uint8(), + 16: pa.uint16(), + 32: pa.uint32(), + 64: pa.uint64()}, + DtypeKind.FLOAT: {16: pa.float16(), + 32: pa.float32(), + 64: pa.float64()}, + DtypeKind.BOOL: {8: pa.uint8()}, + DtypeKind.STRING: {8: pa.string()}, +} + + +def from_dataframe(df: DataFrameObject, allow_copy=True) -> pa.Table: + """ + Build a ``pa.Table`` from any DataFrame supporting the interchange + protocol. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + if isinstance(df, pa.Table): + return df + + if not hasattr(df, "__dataframe__"): + raise ValueError("`df` does not support __dataframe__") + + return _from_dataframe(df.__dataframe__(allow_copy=allow_copy)) + + +def _from_dataframe(df: DataFrameObject, allow_copy=True): + """ + Build a ``pa.Table`` from the DataFrame interchange object. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Table + """ + batches = [] + for chunk in df.get_chunks(): + batch = protocol_df_chunk_to_pyarrow(chunk, allow_copy) + batches.append(batch) + + table = pa.Table.from_batches(batches) + return table + + +def protocol_df_chunk_to_pyarrow( + df: DataFrameObject, + allow_copy: bool = True +) -> pa.Table: + """ + Convert interchange protocol chunk to ``pa.RecordBatch``. + + Parameters + ---------- + df : DataFrameObject + Object supporting the interchange protocol, i.e. `__dataframe__` + method. + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.RecordBatch + """ + # We need a dict of columns here, with each column being a pa.Array + columns: dict[str, pa.Array] = {} + for name in df.column_names(): + if not isinstance(name, str): + raise ValueError(f"Column {name} is not a string") + if name in columns: + raise ValueError(f"Column {name} is not unique") + col = df.get_column_by_name(name) + dtype = col.dtype[0] + if dtype in ( + DtypeKind.INT, + DtypeKind.UINT, + DtypeKind.FLOAT, + DtypeKind.STRING, + DtypeKind.DATETIME, + ): + columns[name] = column_to_array(col) + elif dtype == DtypeKind.BOOL: + columns[name] = bool_column_to_array(col, allow_copy) + elif dtype == DtypeKind.CATEGORICAL: + columns[name] = categorical_column_to_dictionary(col, allow_copy) + else: + raise NotImplementedError(f"Data type {dtype} not handled yet") + + return pa.RecordBatch.from_pydict(columns) + + +def column_to_array( + col: ColumnObject, +) -> pa.Array: + """ + Convert a column holding one of the primitive dtypes to a PyArrow array. + A primitive type is one of: int, uint, float, bool (1 bit). + + Parameters + ---------- + col : ColumnObject + + Returns + ------- + pa.Array + """ + buffers = col.get_buffers() + null_count = col.null_count + data = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset, null_count) + return data + + +def bool_column_to_array( + col: ColumnObject, + allow_copy: bool = True, +) -> pa.Array: + """ + Convert a column holding boolean dtype to a PyArrow array. + + Parameters + ---------- + col : ColumnObject + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.Array + """ + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted from uint8 and a copy " + "is required which is forbidden by allow_copy=False" + ) + + buffers = col.get_buffers() + data = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset) + data = pc.cast(data, pa.bool_()) + + return data + + +def categorical_column_to_dictionary( + col: ColumnObject, + allow_copy: bool = True, +) -> pa.DictionaryArray: + """ + Convert a column holding categorical data to a pa.DictionaryArray. + + Parameters + ---------- + col : ColumnObject + allow_copy : bool, default: True + Whether to allow copying the memory to perform the conversion + (if false then zero-copy approach is requested). + + Returns + ------- + pa.DictionaryArray + """ + if not allow_copy: + raise RuntimeError( + "Boolean column will be casted from uint8 and a copy " + "is required which is forbidden by allow_copy=False" + ) + + categorical = col.describe_categorical + + if not categorical["is_dictionary"]: + raise NotImplementedError( + "Non-dictionary categoricals not supported yet") + + cat_column = categorical["categories"] + dictionary = column_to_array(cat_column) + + buffers = col.get_buffers() + indices = buffers_to_array(buffers, col.size(), + col.describe_null, + col.offset) + + # Constructing a pa.DictionaryArray + dict_array = pa.DictionaryArray.from_arrays(indices, dictionary) + + return dict_array + + +def parse_datetime_format_str(format_str): + """Parse datetime `format_str` to interpret the `data`.""" + + # timestamp 'ts{unit}:tz' + timestamp_meta = re.match(r"ts([smun]):(.*)", format_str) + if timestamp_meta: + unit, tz = timestamp_meta.group(1), timestamp_meta.group(2) + if unit != "s": + # the format string describes only a first letter of the unit, so + # add one extra letter to convert the unit to numpy-style: + # 'm' -> 'ms', 'u' -> 'us', 'n' -> 'ns' + unit += "s" + + return unit, tz + + raise NotImplementedError(f"DateTime kind is not supported: {format_str}") + + +def map_date_type(data_type): + """Map column date type to pyarrow date type. """ + kind, bit_width, f_string, _ = data_type + + if kind == DtypeKind.DATETIME: + unit, tz = parse_datetime_format_str(f_string) + return pa.timestamp(unit, tz=tz) + else: + pa_dtype = _PYARROW_DTYPES.get(kind, {}).get(bit_width, None) + + # Error if dtype is not supported + if pa_dtype: + return pa_dtype + else: + raise NotImplementedError( + f"Conversion for {data_type} is not yet supported.") + + +def buffers_to_array( + buffers: ColumnBuffers, + length: int, + describe_null: ColumnNullType, + offset: int = 0, + null_count: int = -1 +) -> pa.Array: + """ + Build a PyArrow array from the passed buffer. + + Parameters + ---------- + buffer : ColumnBuffers + Dictionary containing tuples of underlying buffers and + their associated dtype. + length : int + The number of values in the array. + describe_null: ColumnNullType + Null representation the column dtype uses, + as a tuple ``(kind, value)`` + offset : int, default: 0 + Number of elements to offset from the start of the buffer. + null_count : int, default: -1 + + Returns + ------- + pa.Array + + Notes + ----- + The returned array doesn't own the memory. The caller of this function + is responsible for keeping the memory owner object alive as long as + the returned PyArrow array is being used. + """ + data_buff, data_type = buffers["data"] + try: + validity_buff, validity_dtype = buffers["validity"] + except TypeError: + validity_buff = None + try: + offset_buff, offset_dtype = buffers["offsets"] + except TypeError: + offset_buff = None + + # Construct a pyarrow Buffer + data_pa_buffer = pa.foreign_buffer(data_buff.ptr, data_buff.bufsize, + base=data_buff) + + # Construct a validity pyarrow Buffer, if applicable + if validity_buff: + validity_pa_buff = validity_buffer_from_mask(validity_buff, + validity_dtype, + describe_null, + length, + offset) + else: + validity_pa_buff = validity_buffer_nan_sentinel(data_pa_buffer, + data_type, + describe_null, + length, + offset) + + # Construct a pyarrow Array from buffers + data_dtype = map_date_type(data_type) + + if offset_buff: + _, offset_bit_width, _, _ = offset_dtype + # If an offset buffer exists, construct an offset pyarrow Buffer + # and add it to the construction of an array + offset_pa_buffer = pa.foreign_buffer(offset_buff.ptr, + offset_buff.bufsize, + base=offset_buff) + + if data_type[2] == 'U': + if not null_count: + null_count = -1 Review Comment: This is not needed in the `else: ` branch? (or, what is the reason it is needed here?) And should the check be `if null_count is None:`? (with `if not null_count`, also the case of null_count=0 would set it to -1) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
