This is an automated email from the ASF dual-hosted git repository.
amolina pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git
The following commit(s) were added to refs/heads/main by this push:
new 79a57a0 [Python] Add Flight streaming example (#109)
79a57a0 is described below
commit 79a57a0a87aaacc7eed82902363170d5fe8445f0
Author: David Li <[email protected]>
AuthorDate: Wed Jan 19 05:23:25 2022 -0500
[Python] Add Flight streaming example (#109)
* [Python] Add Flight streaming example
Fixes #86
* [Python] Remove special dataset example
* [Python] Add back missing server shutdown
---
python/source/flight.rst | 177 ++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 169 insertions(+), 8 deletions(-)
diff --git a/python/source/flight.rst b/python/source/flight.rst
index 69dac6c..354ccf5 100644
--- a/python/source/flight.rst
+++ b/python/source/flight.rst
@@ -10,8 +10,8 @@ Simple Parquet storage service with Arrow Flight
================================================
Suppose you want to implement a service that can store, send and receive
-Parquet files using the Arrow Flight protocol,
-``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight`
+Parquet files using the Arrow Flight protocol,
+``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight`
and particularly through the :class:`pyarrow.flight.FlightServerBase` class.
.. testcode::
@@ -25,7 +25,7 @@ and particularly through the
:class:`pyarrow.flight.FlightServerBase` class.
class FlightServer(pa.flight.FlightServerBase):
- def __init__(self, location="grpc://0.0.0.0:8815",
+ def __init__(self, location="grpc://0.0.0.0:8815",
repo=pathlib.Path("./datasets"), **kwargs):
super(FlightServer, self).__init__(location, **kwargs)
self._location = location
@@ -40,9 +40,9 @@ and particularly through the
:class:`pyarrow.flight.FlightServerBase` class.
)
endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
return pyarrow.flight.FlightInfo(schema,
- descriptor,
+ descriptor,
endpoints,
- metadata.num_rows,
+ metadata.num_rows,
metadata.serialized_size)
def list_flights(self, context, criteria):
@@ -97,13 +97,13 @@ into a parquet file)
This are the most common Arrow Flight requests, if we need to add more
functionalities, we can do so using custom actions.
-In the previous example a ``drop_dataset`` custom action is added.
-All custom actions are executed through the
+In the previous example a ``drop_dataset`` custom action is added.
+All custom actions are executed through the
:meth:`pyarrow.flight.FlightServerBase.do_action` method, thus it's up to
the server subclass to dispatch them properly. In this case we invoke
the `do_drop_dataset` method when the `action.type` is the one we expect.
-Our server can then be started with
+Our server can then be started with
:meth:`pyarrow.flight.FlightServerBase.serve`
.. code-block::
@@ -226,3 +226,164 @@ we might list all parquet files that are currently stored
by the server:
# Shutdown the server
server.shutdown()
+Streaming Parquet Storage Service
+=================================
+
+We can improve the Parquet storage service and avoid holding entire datasets in
+memory by streaming data. Flight readers and writers, like others in PyArrow,
+can be iterated through, so let's update the server from before to take
+advantage of this:
+
+.. testcode::
+
+ import pathlib
+
+ import pyarrow as pa
+ import pyarrow.flight
+ import pyarrow.parquet
+
+
+ class FlightServer(pa.flight.FlightServerBase):
+
+ def __init__(self, location="grpc://0.0.0.0:8815",
+ repo=pathlib.Path("./datasets"), **kwargs):
+ super(FlightServer, self).__init__(location, **kwargs)
+ self._location = location
+ self._repo = repo
+
+ def _make_flight_info(self, dataset):
+ dataset_path = self._repo / dataset
+ schema = pa.parquet.read_schema(dataset_path)
+ metadata = pa.parquet.read_metadata(dataset_path)
+ descriptor = pa.flight.FlightDescriptor.for_path(
+ dataset.encode('utf-8')
+ )
+ endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
+ return pyarrow.flight.FlightInfo(schema,
+ descriptor,
+ endpoints,
+ metadata.num_rows,
+ metadata.serialized_size)
+
+ def list_flights(self, context, criteria):
+ for dataset in self._repo.iterdir():
+ yield self._make_flight_info(dataset.name)
+
+ def get_flight_info(self, context, descriptor):
+ return self._make_flight_info(descriptor.path[0].decode('utf-8'))
+
+ def do_put(self, context, descriptor, reader, writer):
+ dataset = descriptor.path[0].decode('utf-8')
+ dataset_path = self._repo / dataset
+ # Read the uploaded data and write to Parquet incrementally
+ with dataset_path.open("wb") as sink:
+ with pa.parquet.ParquetWriter(sink, reader.schema) as writer:
+ for chunk in reader:
+ writer.write_table(pa.Table.from_batches([chunk.data]))
+
+ def do_get(self, context, ticket):
+ dataset = ticket.ticket.decode('utf-8')
+ # Stream data from a file
+ dataset_path = self._repo / dataset
+ reader = pa.parquet.ParquetFile(dataset_path)
+ return pa.flight.GeneratorStream(
+ reader.schema_arrow, reader.iter_batches())
+
+ def list_actions(self, context):
+ return [
+ ("drop_dataset", "Delete a dataset."),
+ ]
+
+ def do_action(self, context, action):
+ if action.type == "drop_dataset":
+ self.do_drop_dataset(action.body.to_pybytes().decode('utf-8'))
+ else:
+ raise NotImplementedError
+
+ def do_drop_dataset(self, dataset):
+ dataset_path = self._repo / dataset
+ dataset_path.unlink()
+
+First, we've modified :meth:`pyarrow.flight.FlightServerBase.do_put`. Instead
+of reading all the uploaded data into a :class:`pyarrow.Table` before writing,
+we instead iterate through each batch as it comes and add it to a Parquet file.
+
+Then, we've modified :meth:`pyarrow.flight.FlightServerBase.do_get` to stream
+data to the client. This uses :class:`pyarrow.flight.GeneratorStream`, which
+takes a schema and any iterable or iterator. Flight then iterates through and
+sends each record batch to the client, allowing us to handle even large Parquet
+files that don't fit into memory.
+
+While GeneratorStream has the advantage that it can stream data, that means
+Flight must call back into Python for each record batch to send. In contrast,
+RecordBatchStream requires that all data is in-memory up front, but once
+created, all data transfer is handled purely in C++, without needing to call
+Python code.
+
+Let's give the server a spin. As before, we'll start the server:
+
+.. code-block::
+
+ if __name__ == '__main__':
+ server = FlightServer()
+ server._repo.mkdir(exist_ok=True)
+ server.serve()
+
+.. testcode::
+ :hide:
+
+ # Code block to start for real a server in background
+ # and wait for it to be available.
+ # Previous code block is just to show to user how to start it.
+ import threading
+ server = FlightServer()
+ server._repo.mkdir(exist_ok=True)
+ t = threading.Thread(target=server.serve)
+ t.start()
+
+ pa.flight.connect("grpc://0.0.0.0:8815").wait_for_available()
+
+We create a client, and this time, we'll write batches to the writer, as if we
+had a stream of data instead of a table in memory:
+
+.. testcode::
+
+ import pyarrow as pa
+ import pyarrow.flight
+
+ client = pa.flight.connect("grpc://0.0.0.0:8815")
+
+ # Upload a new dataset
+ NUM_BATCHES = 1024
+ ROWS_PER_BATCH = 4096
+ upload_descriptor = pa.flight.FlightDescriptor.for_path("streamed.parquet")
+ batch = pa.record_batch([
+ pa.array(range(ROWS_PER_BATCH)),
+ ], names=["ints"])
+ writer, _ = client.do_put(upload_descriptor, batch.schema)
+ with writer:
+ for _ in range(NUM_BATCHES):
+ writer.write_batch(batch)
+
+As before, we can then read it back. Again, we'll read each batch from the
+stream as it arrives, instead of reading them all into a table:
+
+.. testcode::
+
+ # Read content of the dataset
+ flight = client.get_flight_info(upload_descriptor)
+ reader = client.do_get(flight.endpoints[0].ticket)
+ total_rows = 0
+ for chunk in reader:
+ total_rows += chunk.data.num_rows
+ print("Got", total_rows, "rows total, expected", NUM_BATCHES *
ROWS_PER_BATCH)
+
+.. testoutput::
+
+ Got 4194304 rows total, expected 4194304
+
+.. testcode::
+ :hide:
+
+ # Shutdown the server
+ server.shutdown()