pitrou commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r977447182
##########
python/pyarrow/ipc.pxi:
##########
@@ -832,6 +862,26 @@ cdef class
_RecordBatchFileWriter(_RecordBatchStreamWriter):
self.writer = GetResultValue(
MakeFileWriter(c_sink, schema.sp_schema, self.options))
+_RecordBatchWithMetadata = namedtuple(
+ 'RecordBatchWithMetadata',
+ ('batch', 'custom_metadata'))
+
+
+class RecordBatchWithMetadata(_RecordBatchWithMetadata):
+ """RecordBatch with its custom metadata
+
+ Parameters
+ ----------
+ batch: record batch
+ custom_metadata: record batch's custom metadata
+ """
+ __slots__ = ()
+
+
+@staticmethod
+cdef _wrap_record_batch_with_metadata(CRecordBatchWithMetadata c):
+ return RecordBatchWithMetadata(pyarrow_wrap_batch(c.batch),
pyarrow_wrap_metadata(c.custom_metadata))
Review Comment:
Let's try to avoid long lines:
```suggestion
return RecordBatchWithMetadata(pyarrow_wrap_batch(c.batch),
pyarrow_wrap_metadata(c.custom_metadata))
```
##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
assert_frame_equal(df, rdf)
[email protected]
+def test_ipc_batch_with_custom_metadata_roundtrip():
+ df = pd.DataFrame({'foo': [1.5]})
+
+ batch = pa.RecordBatch.from_pandas(df)
+ sink = pa.BufferOutputStream()
+
+ batch_count = 2
+ with pa.ipc.new_file(sink, batch.schema) as writer:
+ for i in range(batch_count):
+ writer.write_batch(batch, {"batch_id": str(i)})
+
+ buffer = sink.getvalue()
+ source = pa.BufferReader(buffer)
+
+ with pa.ipc.open_file(source) as reader:
+ batch_with_metas = [reader.get_batch_with_custom_metadata(
+ i) for i in range(reader.num_record_batches)]
+
+ for i in range(batch_count):
+ assert batch_with_metas[i].batch.num_rows == 1
+ assert batch_with_metas[i].custom_metadata == {"batch_id": str(i)}
Review Comment:
Perhaps also `assert isinstance(batch_with_metas[i].custom_metadata,
pa.KeyValueMetadata)`?
##########
python/pyarrow/ipc.pxi:
##########
@@ -832,6 +862,26 @@ cdef class
_RecordBatchFileWriter(_RecordBatchStreamWriter):
self.writer = GetResultValue(
MakeFileWriter(c_sink, schema.sp_schema, self.options))
+_RecordBatchWithMetadata = namedtuple(
+ 'RecordBatchWithMetadata',
+ ('batch', 'custom_metadata'))
+
+
+class RecordBatchWithMetadata(_RecordBatchWithMetadata):
+ """RecordBatch with its custom metadata
+
+ Parameters
+ ----------
+ batch: record batch
+ custom_metadata: record batch's custom metadata
Review Comment:
```suggestion
batch: RecordBatch
custom_metadata: KeyValueMetadata
```
##########
python/pyarrow/ipc.pxi:
##########
@@ -908,6 +958,32 @@ cdef class _RecordBatchFileReader(_Weakrefable):
# time has passed
get_record_batch = get_batch
+ def get_batch_with_custom_metadata(self, int i):
+ """
+ Read the record batch with the given index along with its custom
metadata
Review Comment:
```suggestion
Read the record batch with the given index along with
its custom metadata.
```
##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
assert_frame_equal(df, rdf)
[email protected]
+def test_ipc_batch_with_custom_metadata_roundtrip():
+ df = pd.DataFrame({'foo': [1.5]})
+
+ batch = pa.RecordBatch.from_pandas(df)
+ sink = pa.BufferOutputStream()
+
+ batch_count = 2
+ with pa.ipc.new_file(sink, batch.schema) as writer:
+ for i in range(batch_count):
+ writer.write_batch(batch, {"batch_id": str(i)})
Review Comment:
Can you add a batch with no metadata?
##########
python/pyarrow/ipc.pxi:
##########
@@ -687,6 +692,31 @@ cdef class RecordBatchReader(_Weakrefable):
return pyarrow_wrap_batch(batch)
+ def read_next_batch_with_custom_metadata(self):
+ """
+ Read next RecordBatch from the stream along with its custom metadata.
+
+ Raises
+ ------
+ StopIteration:
+ At end of stream.
+
+ Returns
+ -------
+ batch : RecordBatch
+ custom_metadata : KeyValueMetadata or dict
Review Comment:
```suggestion
custom_metadata : KeyValueMetadata
```
##########
python/pyarrow/ipc.pxi:
##########
@@ -908,6 +958,32 @@ cdef class _RecordBatchFileReader(_Weakrefable):
# time has passed
get_record_batch = get_batch
+ def get_batch_with_custom_metadata(self, int i):
+ """
+ Read the record batch with the given index along with its custom
metadata
+
+ Parameters
+ ----------
+ i : int
+ The index of the record batch in the IPC file.
+
+ Returns
+ -------
+ batch : RecordBatch
+ custom_metadata : KeyValueMetadata or dict
Review Comment:
```suggestion
custom_metadata : KeyValueMetadata
```
##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
assert_frame_equal(df, rdf)
[email protected]
+def test_ipc_batch_with_custom_metadata_roundtrip():
+ df = pd.DataFrame({'foo': [1.5]})
+
+ batch = pa.RecordBatch.from_pandas(df)
+ sink = pa.BufferOutputStream()
+
+ batch_count = 2
+ with pa.ipc.new_file(sink, batch.schema) as writer:
+ for i in range(batch_count):
+ writer.write_batch(batch, {"batch_id": str(i)})
+
+ buffer = sink.getvalue()
+ source = pa.BufferReader(buffer)
+
+ with pa.ipc.open_file(source) as reader:
+ batch_with_metas = [reader.get_batch_with_custom_metadata(
+ i) for i in range(reader.num_record_batches)]
+
+ for i in range(batch_count):
+ assert batch_with_metas[i].batch.num_rows == 1
+ assert batch_with_metas[i].custom_metadata == {"batch_id": str(i)}
+
+
+def batch_with_meta_generator(reader):
+ while True:
+ try:
+ yield reader.read_next_batch_with_custom_metadata()
+ except StopIteration:
+ return
+
+
[email protected]
+def test_record_batch_reader_with_custom_metadata_roundtrip():
+ df = pd.DataFrame({'foo': [1.5]})
+
+ batch = pa.RecordBatch.from_pandas(df)
+ sink = pa.BufferOutputStream()
+
+ batch_count = 2
+ with pa.ipc.new_stream(sink, batch.schema) as writer:
+ for i in range(batch_count):
+ writer.write_batch(batch, {"batch_id": str(i)})
Review Comment:
Can you add a batch with no metadata?
##########
python/pyarrow/_flight.pyx:
##########
@@ -986,14 +986,18 @@ cdef class MetadataRecordBatchWriter(_CRecordBatchWriter):
----------
batch : RecordBatch
"""
+ cdef:
+ shared_ptr[const CKeyValueMetadata] custom_metadata
Review Comment:
Do we want to allow passing the custom_metadata as an argument to
`write_batch` here?
##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
assert_frame_equal(df, rdf)
[email protected]
+def test_ipc_batch_with_custom_metadata_roundtrip():
+ df = pd.DataFrame({'foo': [1.5]})
+
+ batch = pa.RecordBatch.from_pandas(df)
+ sink = pa.BufferOutputStream()
+
+ batch_count = 2
+ with pa.ipc.new_file(sink, batch.schema) as writer:
+ for i in range(batch_count):
+ writer.write_batch(batch, {"batch_id": str(i)})
+
+ buffer = sink.getvalue()
+ source = pa.BufferReader(buffer)
+
+ with pa.ipc.open_file(source) as reader:
+ batch_with_metas = [reader.get_batch_with_custom_metadata(
+ i) for i in range(reader.num_record_batches)]
+
+ for i in range(batch_count):
+ assert batch_with_metas[i].batch.num_rows == 1
+ assert batch_with_metas[i].custom_metadata == {"batch_id": str(i)}
+
+
+def batch_with_meta_generator(reader):
Review Comment:
Perhaps it would be useful to expose this as
`RecordBatchReader.iter_batches_with_custom_metadata`?
##########
python/pyarrow/ipc.pxi:
##########
@@ -471,17 +471,22 @@ cdef class _CRecordBatchWriter(_Weakrefable):
else:
raise ValueError(type(table_or_batch))
- def write_batch(self, RecordBatch batch):
+ def write_batch(self, RecordBatch batch, custom_metadata=None):
"""
Write RecordBatch to stream.
Parameters
----------
batch : RecordBatch
+ custom_metadata : dict
Review Comment:
I see you forgot to add this suggestion?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]