This is an automated email from the ASF dual-hosted git repository.
alenka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new cc9e649d03 GH-35331: [Python] Expose Parquet sorting metadata (#37665)
cc9e649d03 is described below
commit cc9e649d0382c70552e6e556199a3e238dbb7576
Author: Judah Rand <[email protected]>
AuthorDate: Wed Dec 20 09:34:23 2023 +0000
GH-35331: [Python] Expose Parquet sorting metadata (#37665)
### Rationale for this change
Picking up where #35453 left off.
Closes https://github.com/apache/arrow/issues/35331
This PR builds on top of #37469
### What changes are included in this PR?
### Are these changes tested?
### Are there any user-facing changes?
* Closes: #35331
Lead-authored-by: Judah Rand <[email protected]>
Co-authored-by: Will Jones <[email protected]>
Signed-off-by: AlenkaF <[email protected]>
---
docs/source/python/api/formats.rst | 1 +
python/pyarrow/_dataset_parquet.pyx | 2 +
python/pyarrow/_parquet.pxd | 24 ++-
python/pyarrow/_parquet.pyx | 284 +++++++++++++++++++++++++-
python/pyarrow/parquet/core.py | 12 +-
python/pyarrow/tests/parquet/test_metadata.py | 84 ++++++++
6 files changed, 394 insertions(+), 13 deletions(-)
diff --git a/docs/source/python/api/formats.rst
b/docs/source/python/api/formats.rst
index 9ca499c097..86e2585ac2 100644
--- a/docs/source/python/api/formats.rst
+++ b/docs/source/python/api/formats.rst
@@ -97,6 +97,7 @@ Parquet Metadata
FileMetaData
RowGroupMetaData
+ SortingColumn
ColumnChunkMetaData
Statistics
ParquetSchema
diff --git a/python/pyarrow/_dataset_parquet.pyx
b/python/pyarrow/_dataset_parquet.pyx
index 61e051f56c..58ef6145cf 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -609,6 +609,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
dictionary_pagesize_limit=self._properties["dictionary_pagesize_limit"],
write_page_index=self._properties["write_page_index"],
write_page_checksum=self._properties["write_page_checksum"],
+ sorting_columns=self._properties["sorting_columns"],
)
def _set_arrow_properties(self):
@@ -659,6 +660,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
write_page_index=False,
encryption_config=None,
write_page_checksum=False,
+ sorting_columns=None,
)
self._set_properties()
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 7ce747e0aa..ae4094d8b4 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -328,11 +328,17 @@ cdef extern from "parquet/api/reader.h" namespace
"parquet" nogil:
optional[ParquetIndexLocation] GetColumnIndexLocation() const
optional[ParquetIndexLocation] GetOffsetIndexLocation() const
+ struct CSortingColumn" parquet::SortingColumn":
+ int column_idx
+ c_bool descending
+ c_bool nulls_first
+
cdef cppclass CRowGroupMetaData" parquet::RowGroupMetaData":
c_bool Equals(const CRowGroupMetaData&) const
- int num_columns()
- int64_t num_rows()
- int64_t total_byte_size()
+ int num_columns() const
+ int64_t num_rows() const
+ int64_t total_byte_size() const
+ vector[CSortingColumn] sorting_columns() const
unique_ptr[CColumnChunkMetaData] ColumnChunk(int i) const
cdef cppclass CFileMetaData" parquet::FileMetaData":
@@ -421,6 +427,7 @@ cdef extern from "parquet/api/writer.h" namespace "parquet"
nogil:
Builder* disable_dictionary()
Builder* enable_dictionary()
Builder* enable_dictionary(const c_string& path)
+ Builder* set_sorting_columns(vector[CSortingColumn]
sorting_columns)
Builder* disable_statistics()
Builder* enable_statistics()
Builder* enable_statistics(const c_string& path)
@@ -517,8 +524,8 @@ cdef extern from "parquet/arrow/schema.h" namespace
"parquet::arrow" nogil:
CStatus ToParquetSchema(
const CSchema* arrow_schema,
- const ArrowReaderProperties& properties,
- const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
+ const WriterProperties& properties,
+ const ArrowWriterProperties& arrow_properties,
shared_ptr[SchemaDescriptor]* out)
@@ -584,7 +591,9 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
write_batch_size=*,
dictionary_pagesize_limit=*,
write_page_index=*,
- write_page_checksum=*) except *
+ write_page_checksum=*,
+ sorting_columns=*,
+) except *
cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
@@ -593,7 +602,8 @@ cdef shared_ptr[ArrowWriterProperties]
_create_arrow_writer_properties(
allow_truncated_timestamps=*,
writer_engine_version=*,
use_compliant_nested_type=*,
- store_schema=*) except *
+ store_schema=*,
+) except *
cdef class ParquetSchema(_Weakrefable):
cdef:
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 35344eb735..0b68524565 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -18,6 +18,7 @@
# cython: profile=False
# distutils: language = c++
+from collections.abc import Sequence
from textwrap import indent
import warnings
@@ -31,6 +32,7 @@ from pyarrow.lib cimport (_Weakrefable, Buffer, Schema,
Table, NativeFile,
pyarrow_wrap_chunked_array,
pyarrow_wrap_schema,
+ pyarrow_unwrap_schema,
pyarrow_wrap_table,
pyarrow_wrap_batch,
pyarrow_wrap_scalar,
@@ -506,6 +508,204 @@ cdef class ColumnChunkMetaData(_Weakrefable):
return self.metadata.GetColumnIndexLocation().has_value()
+cdef class SortingColumn:
+ """
+ Sorting specification for a single column.
+
+ Returned by :meth:`RowGroupMetaData.sorting_columns` and used in
+ :class:`ParquetWriter` to specify the sort order of the data.
+
+ Parameters
+ ----------
+ column_index : int
+ Index of column that data is sorted by.
+ descending : bool, default False
+ Whether column is sorted in descending order.
+ nulls_first : bool, default False
+ Whether null values appear before valid values.
+
+ Notes
+ -----
+
+ Column indices are zero-based, refer only to leaf fields, and are in
+ depth-first order. This may make the column indices for nested schemas
+ different from what you expect. In most cases, it will be easier to
+ specify the sort order using column names instead of column indices
+ and converting using the ``from_ordering`` method.
+
+ Examples
+ --------
+
+ In other APIs, sort order is specified by names, such as:
+
+ >>> sort_order = [('id', 'ascending'), ('timestamp', 'descending')]
+
+ For Parquet, the column index must be used instead:
+
+ >>> import pyarrow.parquet as pq
+ >>> [pq.SortingColumn(0), pq.SortingColumn(1, descending=True)]
+ [SortingColumn(column_index=0, descending=False, nulls_first=False),
SortingColumn(column_index=1, descending=True, nulls_first=False)]
+
+ Convert the sort_order into the list of sorting columns with
+ ``from_ordering`` (note that the schema must be provided as well):
+
+ >>> import pyarrow as pa
+ >>> schema = pa.schema([('id', pa.int64()), ('timestamp',
pa.timestamp('ms'))])
+ >>> sorting_columns = pq.SortingColumn.from_ordering(schema, sort_order)
+ >>> sorting_columns
+ (SortingColumn(column_index=0, descending=False, nulls_first=False),
SortingColumn(column_index=1, descending=True, nulls_first=False))
+
+ Convert back to the sort order with ``to_ordering``:
+
+ >>> pq.SortingColumn.to_ordering(schema, sorting_columns)
+ ((('id', 'ascending'), ('timestamp', 'descending')), 'at_end')
+
+ See Also
+ --------
+ RowGroupMetaData.sorting_columns
+ """
+ cdef int column_index
+ cdef c_bool descending
+ cdef c_bool nulls_first
+
+ def __init__(self, int column_index, c_bool descending=False, c_bool
nulls_first=False):
+ self.column_index = column_index
+ self.descending = descending
+ self.nulls_first = nulls_first
+
+ @classmethod
+ def from_ordering(cls, Schema schema, sort_keys, null_placement='at_end'):
+ """
+ Create a tuple of SortingColumn objects from the same arguments as
+ :class:`pyarrow.compute.SortOptions`.
+
+ Parameters
+ ----------
+ schema : Schema
+ Schema of the input data.
+ sort_keys : Sequence of (name, order) tuples
+ Names of field/column keys (str) to sort the input on,
+ along with the order each field/column is sorted in.
+ Accepted values for `order` are "ascending", "descending".
+ null_placement : {'at_start', 'at_end'}, default 'at_end'
+ Where null values should appear in the sort order.
+
+ Returns
+ -------
+ sorting_columns : tuple of SortingColumn
+ """
+ if null_placement == 'at_start':
+ nulls_first = True
+ elif null_placement == 'at_end':
+ nulls_first = False
+ else:
+ raise ValueError('null_placement must be "at_start" or "at_end"')
+
+ col_map = _name_to_index_map(schema)
+
+ sorting_columns = []
+
+ for sort_key in sort_keys:
+ if isinstance(sort_key, str):
+ name = sort_key
+ descending = False
+ elif (isinstance(sort_key, tuple) and len(sort_key) == 2 and
+ isinstance(sort_key[0], str) and
+ isinstance(sort_key[1], str)):
+ name, descending = sort_key
+ if descending == "descending":
+ descending = True
+ elif descending == "ascending":
+ descending = False
+ else:
+ raise ValueError("Invalid sort key direction: {0}"
+ .format(descending))
+ else:
+ raise ValueError("Invalid sort key: {0}".format(sort_key))
+
+ try:
+ column_index = col_map[name]
+ except KeyError:
+ raise ValueError("Sort key name '{0}' not found in
schema:\n{1}"
+ .format(name, schema))
+
+ sorting_columns.append(
+ cls(column_index, descending=descending,
nulls_first=nulls_first)
+ )
+
+ return tuple(sorting_columns)
+
+ @staticmethod
+ def to_ordering(Schema schema, sorting_columns):
+ """
+ Convert a tuple of SortingColumn objects to the same format as
+ :class:`pyarrow.compute.SortOptions`.
+
+ Parameters
+ ----------
+ schema : Schema
+ Schema of the input data.
+ sorting_columns : tuple of SortingColumn
+ Columns to sort the input on.
+
+ Returns
+ -------
+ sort_keys : tuple of (name, order) tuples
+ null_placement : {'at_start', 'at_end'}
+ """
+ col_map = {i: name for name, i in _name_to_index_map(schema).items()}
+
+ sort_keys = []
+ nulls_first = None
+
+ for sorting_column in sorting_columns:
+ name = col_map[sorting_column.column_index]
+ if sorting_column.descending:
+ order = "descending"
+ else:
+ order = "ascending"
+ sort_keys.append((name, order))
+ if nulls_first is None:
+ nulls_first = sorting_column.nulls_first
+ elif nulls_first != sorting_column.nulls_first:
+ raise ValueError("Sorting columns have inconsistent null
placement")
+
+ if nulls_first:
+ null_placement = "at_start"
+ else:
+ null_placement = "at_end"
+
+ return tuple(sort_keys), null_placement
+
+ def __repr__(self):
+ return """{}(column_index={}, descending={}, nulls_first={})""".format(
+ self.__class__.__name__,
+ self.column_index, self.descending, self.nulls_first)
+
+ def __eq__(self, SortingColumn other):
+ return (self.column_index == other.column_index and
+ self.descending == other.descending and
+ self.nulls_first == other.nulls_first)
+
+ def __hash__(self):
+ return hash((self.column_index, self.descending, self.nulls_first))
+
+ @property
+ def column_index(self):
+ """"Index of column data is sorted by (int)."""
+ return self.column_index
+
+ @property
+ def descending(self):
+ """Whether column is sorted in descending order (bool)."""
+ return self.descending
+
+ @property
+ def nulls_first(self):
+ """Whether null values appear before valid values (bool)."""
+ return self.nulls_first
+
+
cdef class RowGroupMetaData(_Weakrefable):
"""Metadata for a single row group."""
@@ -565,10 +765,12 @@ cdef class RowGroupMetaData(_Weakrefable):
return """{0}
num_columns: {1}
num_rows: {2}
- total_byte_size: {3}""".format(object.__repr__(self),
+ total_byte_size: {3}
+ sorting_columns: {4}""".format(object.__repr__(self),
self.num_columns,
self.num_rows,
- self.total_byte_size)
+ self.total_byte_size,
+ self.sorting_columns)
def to_dict(self):
"""
@@ -585,6 +787,7 @@ cdef class RowGroupMetaData(_Weakrefable):
num_rows=self.num_rows,
total_byte_size=self.total_byte_size,
columns=columns,
+ sorting_columns=[col.to_dict() for col in self.sorting_columns]
)
for i in range(self.num_columns):
columns.append(self.column(i).to_dict())
@@ -605,6 +808,19 @@ cdef class RowGroupMetaData(_Weakrefable):
"""Total byte size of all the uncompressed column data in this row
group (int)."""
return self.metadata.total_byte_size()
+ @property
+ def sorting_columns(self):
+ """Columns the row group is sorted by (tuple of
:class:`SortingColumn`))."""
+ out = []
+ cdef vector[CSortingColumn] sorting_columns =
self.metadata.sorting_columns()
+ for sorting_col in sorting_columns:
+ out.append(SortingColumn(
+ sorting_col.column_idx,
+ sorting_col.descending,
+ sorting_col.nulls_first
+ ))
+ return tuple(out)
+
def _reconstruct_filemetadata(Buffer serialized):
cdef:
@@ -1550,6 +1766,28 @@ cdef class ParquetReader(_Weakrefable):
return closed
+cdef CSortingColumn _convert_sorting_column(SortingColumn sorting_column):
+ cdef CSortingColumn c_sorting_column
+
+ c_sorting_column.column_idx = sorting_column.column_index
+ c_sorting_column.descending = sorting_column.descending
+ c_sorting_column.nulls_first = sorting_column.nulls_first
+
+ return c_sorting_column
+
+
+cdef vector[CSortingColumn] _convert_sorting_columns(sorting_columns) except *:
+ if not (isinstance(sorting_columns, Sequence)
+ and all(isinstance(col, SortingColumn) for col in
sorting_columns)):
+ raise ValueError(
+ "'sorting_columns' must be a list of `SortingColumn`")
+
+ cdef vector[CSortingColumn] c_sorting_columns =
[_convert_sorting_column(col)
+ for col in
sorting_columns]
+
+ return c_sorting_columns
+
+
cdef shared_ptr[WriterProperties] _create_writer_properties(
use_dictionary=None,
compression=None,
@@ -1564,7 +1802,8 @@ cdef shared_ptr[WriterProperties]
_create_writer_properties(
write_batch_size=None,
dictionary_pagesize_limit=None,
write_page_index=False,
- write_page_checksum=False) except *:
+ write_page_checksum=False,
+ sorting_columns=None) except *:
"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
@@ -1649,6 +1888,11 @@ cdef shared_ptr[WriterProperties]
_create_writer_properties(
for column in write_statistics:
props.enable_statistics(tobytes(column))
+ # sorting_columns
+
+ if sorting_columns is not None:
+ props.set_sorting_columns(_convert_sorting_columns(sorting_columns))
+
# use_byte_stream_split
if isinstance(use_byte_stream_split, bool):
@@ -1788,6 +2032,34 @@ cdef shared_ptr[ArrowWriterProperties]
_create_arrow_writer_properties(
return arrow_properties
+cdef _name_to_index_map(Schema arrow_schema):
+ cdef:
+ shared_ptr[CSchema] sp_arrow_schema
+ shared_ptr[SchemaDescriptor] sp_parquet_schema
+ shared_ptr[WriterProperties] props = _create_writer_properties()
+ shared_ptr[ArrowWriterProperties] arrow_props =
_create_arrow_writer_properties(
+ use_deprecated_int96_timestamps=False,
+ coerce_timestamps=None,
+ allow_truncated_timestamps=False,
+ writer_engine_version="V2"
+ )
+
+ sp_arrow_schema = pyarrow_unwrap_schema(arrow_schema)
+
+ with nogil:
+ check_status(ToParquetSchema(
+ sp_arrow_schema.get(), deref(props.get()),
deref(arrow_props.get()), &sp_parquet_schema))
+
+ out = dict()
+
+ cdef SchemaDescriptor* parquet_schema = sp_parquet_schema.get()
+
+ for i in range(parquet_schema.num_columns()):
+ name = frombytes(parquet_schema.Column(i).path().get().ToDotString())
+ out[name] = i
+
+ return out
+
cdef class ParquetWriter(_Weakrefable):
cdef:
@@ -1835,7 +2107,8 @@ cdef class ParquetWriter(_Weakrefable):
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
- write_page_checksum=False):
+ write_page_checksum=False,
+ sorting_columns=None):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
@@ -1867,7 +2140,8 @@ cdef class ParquetWriter(_Weakrefable):
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit,
write_page_index=write_page_index,
- write_page_checksum=write_page_checksum
+ write_page_checksum=write_page_checksum,
+ sorting_columns=sorting_columns,
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py
index db22eb3293..852b339211 100644
--- a/python/pyarrow/parquet/core.py
+++ b/python/pyarrow/parquet/core.py
@@ -48,7 +48,8 @@ from pyarrow._parquet import (ParquetReader, Statistics, #
noqa
ParquetSchema, ColumnSchema,
ParquetLogicalType,
FileEncryptionProperties,
- FileDecryptionProperties)
+ FileDecryptionProperties,
+ SortingColumn)
from pyarrow.fs import (LocalFileSystem, FileSystem, FileType,
_resolve_filesystem_and_path, _ensure_filesystem)
from pyarrow import filesystem as legacyfs
@@ -895,6 +896,10 @@ write_page_checksum : bool, default False
Whether to write page checksums in general for all columns.
Page checksums enable detection of data corruption, which might occur
during
transmission or in the storage.
+sorting_columns : Sequence of SortingColumn, default None
+ Specify the sort order of the data being written. The writer does not sort
+ the data nor does it verify that the data is sorted. The sort order is
+ written to the row group metadata, which can then be used by readers.
"""
_parquet_writer_example_doc = """\
@@ -989,6 +994,7 @@ Examples
store_schema=True,
write_page_index=False,
write_page_checksum=False,
+ sorting_columns=None,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
@@ -1047,6 +1053,7 @@ Examples
store_schema=store_schema,
write_page_index=write_page_index,
write_page_checksum=write_page_checksum,
+ sorting_columns=sorting_columns,
**options)
self.is_open = True
@@ -3129,6 +3136,7 @@ def write_table(table, where, row_group_size=None,
version='2.6',
store_schema=True,
write_page_index=False,
write_page_checksum=False,
+ sorting_columns=None,
**kwargs):
# Implementor's note: when adding keywords here / updating defaults, also
# update it in write_to_dataset and _dataset_parquet.pyx
ParquetFileWriteOptions
@@ -3158,6 +3166,7 @@ def write_table(table, where, row_group_size=None,
version='2.6',
store_schema=store_schema,
write_page_index=write_page_index,
write_page_checksum=write_page_checksum,
+ sorting_columns=sorting_columns,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
@@ -3742,6 +3751,7 @@ __all__ = (
"ParquetWriter",
"PartitionSet",
"RowGroupMetaData",
+ "SortingColumn",
"Statistics",
"read_metadata",
"read_pandas",
diff --git a/python/pyarrow/tests/parquet/test_metadata.py
b/python/pyarrow/tests/parquet/test_metadata.py
index 3efaf1dbf5..73284d2e53 100644
--- a/python/pyarrow/tests/parquet/test_metadata.py
+++ b/python/pyarrow/tests/parquet/test_metadata.py
@@ -301,6 +301,90 @@ def test_parquet_write_disable_statistics(tempdir):
assert cc_b.statistics is None
+def test_parquet_sorting_column():
+ sorting_col = pq.SortingColumn(10)
+ assert sorting_col.column_index == 10
+ assert sorting_col.descending is False
+ assert sorting_col.nulls_first is False
+
+ sorting_col = pq.SortingColumn(0, descending=True, nulls_first=True)
+ assert sorting_col.column_index == 0
+ assert sorting_col.descending is True
+ assert sorting_col.nulls_first is True
+
+ schema = pa.schema([('a', pa.int64()), ('b', pa.int64())])
+ sorting_cols = (
+ pq.SortingColumn(1, descending=True),
+ pq.SortingColumn(0, descending=False),
+ )
+ sort_order, null_placement = pq.SortingColumn.to_ordering(schema,
sorting_cols)
+ assert sort_order == (('b', "descending"), ('a', "ascending"))
+ assert null_placement == "at_end"
+
+ sorting_cols_roundtripped = pq.SortingColumn.from_ordering(
+ schema, sort_order, null_placement)
+ assert sorting_cols_roundtripped == sorting_cols
+
+ sorting_cols = pq.SortingColumn.from_ordering(
+ schema, ('a', ('b', "descending")), null_placement="at_start")
+ expected = (
+ pq.SortingColumn(0, descending=False, nulls_first=True),
+ pq.SortingColumn(1, descending=True, nulls_first=True),
+ )
+ assert sorting_cols == expected
+
+ # Conversions handle empty tuples
+ empty_sorting_cols = pq.SortingColumn.from_ordering(schema, ())
+ assert empty_sorting_cols == ()
+
+ assert pq.SortingColumn.to_ordering(schema, ()) == ((), "at_end")
+
+ with pytest.raises(ValueError):
+ pq.SortingColumn.from_ordering(schema, (("a", "not a valid sort
order")))
+
+ with pytest.raises(ValueError, match="inconsistent null placement"):
+ sorting_cols = (
+ pq.SortingColumn(1, nulls_first=True),
+ pq.SortingColumn(0, nulls_first=False),
+ )
+ pq.SortingColumn.to_ordering(schema, sorting_cols)
+
+
+def test_parquet_sorting_column_nested():
+ schema = pa.schema({
+ 'a': pa.struct([('x', pa.int64()), ('y', pa.int64())]),
+ 'b': pa.int64()
+ })
+
+ sorting_columns = [
+ pq.SortingColumn(0, descending=True), # a.x
+ pq.SortingColumn(2, descending=False) # b
+ ]
+
+ sort_order, null_placement = pq.SortingColumn.to_ordering(schema,
sorting_columns)
+ assert null_placement == "at_end"
+ assert len(sort_order) == 2
+ assert sort_order[0] == ("a.x", "descending")
+ assert sort_order[1] == ("b", "ascending")
+
+
+def test_parquet_file_sorting_columns():
+ table = pa.table({'a': [1, 2, 3], 'b': ['a', 'b', 'c']})
+
+ sorting_columns = (
+ pq.SortingColumn(column_index=0, descending=True, nulls_first=True),
+ pq.SortingColumn(column_index=1, descending=False),
+ )
+ writer = pa.BufferOutputStream()
+ _write_table(table, writer, sorting_columns=sorting_columns)
+ reader = pa.BufferReader(writer.getvalue())
+
+ # Can retrieve sorting columns from metadata
+ metadata = pq.read_metadata(reader)
+ assert metadata.num_row_groups == 1
+ assert sorting_columns == metadata.row_group(0).sorting_columns
+
+
def test_field_id_metadata():
# ARROW-7080
field_id = b'PARQUET:field_id'