This is an automated email from the ASF dual-hosted git repository.

amolina pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git


The following commit(s) were added to refs/heads/main by this push:
     new 79a57a0  [Python] Add Flight streaming example (#109)
79a57a0 is described below

commit 79a57a0a87aaacc7eed82902363170d5fe8445f0
Author: David Li <[email protected]>
AuthorDate: Wed Jan 19 05:23:25 2022 -0500

    [Python] Add Flight streaming example (#109)
    
    * [Python] Add Flight streaming example
    
    Fixes #86
    
    * [Python] Remove special dataset example
    
    * [Python] Add back missing server shutdown
---
 python/source/flight.rst | 177 ++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 169 insertions(+), 8 deletions(-)

diff --git a/python/source/flight.rst b/python/source/flight.rst
index 69dac6c..354ccf5 100644
--- a/python/source/flight.rst
+++ b/python/source/flight.rst
@@ -10,8 +10,8 @@ Simple Parquet storage service with Arrow Flight
 ================================================
 
 Suppose you want to implement a service that can store, send and receive
-Parquet files using the Arrow Flight protocol, 
-``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight` 
+Parquet files using the Arrow Flight protocol,
+``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight`
 and particularly through the :class:`pyarrow.flight.FlightServerBase` class.
 
 .. testcode::
@@ -25,7 +25,7 @@ and particularly through the 
:class:`pyarrow.flight.FlightServerBase` class.
 
     class FlightServer(pa.flight.FlightServerBase):
 
-        def __init__(self, location="grpc://0.0.0.0:8815", 
+        def __init__(self, location="grpc://0.0.0.0:8815",
                     repo=pathlib.Path("./datasets"), **kwargs):
             super(FlightServer, self).__init__(location, **kwargs)
             self._location = location
@@ -40,9 +40,9 @@ and particularly through the 
:class:`pyarrow.flight.FlightServerBase` class.
             )
             endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
             return pyarrow.flight.FlightInfo(schema,
-                                            descriptor, 
+                                            descriptor,
                                             endpoints,
-                                            metadata.num_rows, 
+                                            metadata.num_rows,
                                             metadata.serialized_size)
 
         def list_flights(self, context, criteria):
@@ -97,13 +97,13 @@ into a parquet file)
 This are the most common Arrow Flight requests, if we need to add more
 functionalities, we can do so using custom actions.
 
-In the previous example a ``drop_dataset`` custom action is added. 
-All custom actions are executed through the 
+In the previous example a ``drop_dataset`` custom action is added.
+All custom actions are executed through the
 :meth:`pyarrow.flight.FlightServerBase.do_action` method, thus it's up to
 the server subclass to dispatch them properly. In this case we invoke
 the `do_drop_dataset` method when the `action.type` is the one we expect.
 
-Our server can then be started with 
+Our server can then be started with
 :meth:`pyarrow.flight.FlightServerBase.serve`
 
 .. code-block::
@@ -226,3 +226,164 @@ we might list all parquet files that are currently stored 
by the server:
     # Shutdown the server
     server.shutdown()
 
+Streaming Parquet Storage Service
+=================================
+
+We can improve the Parquet storage service and avoid holding entire datasets in
+memory by streaming data. Flight readers and writers, like others in PyArrow,
+can be iterated through, so let's update the server from before to take
+advantage of this:
+
+.. testcode::
+
+   import pathlib
+
+   import pyarrow as pa
+   import pyarrow.flight
+   import pyarrow.parquet
+
+
+   class FlightServer(pa.flight.FlightServerBase):
+
+       def __init__(self, location="grpc://0.0.0.0:8815",
+                   repo=pathlib.Path("./datasets"), **kwargs):
+           super(FlightServer, self).__init__(location, **kwargs)
+           self._location = location
+           self._repo = repo
+
+       def _make_flight_info(self, dataset):
+           dataset_path = self._repo / dataset
+           schema = pa.parquet.read_schema(dataset_path)
+           metadata = pa.parquet.read_metadata(dataset_path)
+           descriptor = pa.flight.FlightDescriptor.for_path(
+               dataset.encode('utf-8')
+           )
+           endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
+           return pyarrow.flight.FlightInfo(schema,
+                                           descriptor,
+                                           endpoints,
+                                           metadata.num_rows,
+                                           metadata.serialized_size)
+
+       def list_flights(self, context, criteria):
+           for dataset in self._repo.iterdir():
+               yield self._make_flight_info(dataset.name)
+
+       def get_flight_info(self, context, descriptor):
+           return self._make_flight_info(descriptor.path[0].decode('utf-8'))
+
+       def do_put(self, context, descriptor, reader, writer):
+           dataset = descriptor.path[0].decode('utf-8')
+           dataset_path = self._repo / dataset
+           # Read the uploaded data and write to Parquet incrementally
+           with dataset_path.open("wb") as sink:
+               with pa.parquet.ParquetWriter(sink, reader.schema) as writer:
+                   for chunk in reader:
+                       writer.write_table(pa.Table.from_batches([chunk.data]))
+
+       def do_get(self, context, ticket):
+           dataset = ticket.ticket.decode('utf-8')
+           # Stream data from a file
+           dataset_path = self._repo / dataset
+           reader = pa.parquet.ParquetFile(dataset_path)
+           return pa.flight.GeneratorStream(
+               reader.schema_arrow, reader.iter_batches())
+
+       def list_actions(self, context):
+           return [
+               ("drop_dataset", "Delete a dataset."),
+           ]
+
+       def do_action(self, context, action):
+           if action.type == "drop_dataset":
+               self.do_drop_dataset(action.body.to_pybytes().decode('utf-8'))
+           else:
+               raise NotImplementedError
+
+       def do_drop_dataset(self, dataset):
+           dataset_path = self._repo / dataset
+           dataset_path.unlink()
+
+First, we've modified :meth:`pyarrow.flight.FlightServerBase.do_put`. Instead
+of reading all the uploaded data into a :class:`pyarrow.Table` before writing,
+we instead iterate through each batch as it comes and add it to a Parquet file.
+
+Then, we've modified :meth:`pyarrow.flight.FlightServerBase.do_get` to stream
+data to the client. This uses :class:`pyarrow.flight.GeneratorStream`, which
+takes a schema and any iterable or iterator. Flight then iterates through and
+sends each record batch to the client, allowing us to handle even large Parquet
+files that don't fit into memory.
+
+While GeneratorStream has the advantage that it can stream data, that means
+Flight must call back into Python for each record batch to send. In contrast,
+RecordBatchStream requires that all data is in-memory up front, but once
+created, all data transfer is handled purely in C++, without needing to call
+Python code.
+
+Let's give the server a spin. As before, we'll start the server:
+
+.. code-block::
+
+    if __name__ == '__main__':
+        server = FlightServer()
+        server._repo.mkdir(exist_ok=True)
+        server.serve()
+
+.. testcode::
+    :hide:
+
+    # Code block to start for real a server in background
+    # and wait for it to be available.
+    # Previous code block is just to show to user how to start it.
+    import threading
+    server = FlightServer()
+    server._repo.mkdir(exist_ok=True)
+    t = threading.Thread(target=server.serve)
+    t.start()
+
+    pa.flight.connect("grpc://0.0.0.0:8815").wait_for_available()
+
+We create a client, and this time, we'll write batches to the writer, as if we
+had a stream of data instead of a table in memory:
+
+.. testcode::
+
+   import pyarrow as pa
+   import pyarrow.flight
+
+   client = pa.flight.connect("grpc://0.0.0.0:8815")
+
+   # Upload a new dataset
+   NUM_BATCHES = 1024
+   ROWS_PER_BATCH = 4096
+   upload_descriptor = pa.flight.FlightDescriptor.for_path("streamed.parquet")
+   batch = pa.record_batch([
+       pa.array(range(ROWS_PER_BATCH)),
+   ], names=["ints"])
+   writer, _ = client.do_put(upload_descriptor, batch.schema)
+   with writer:
+       for _ in range(NUM_BATCHES):
+           writer.write_batch(batch)
+
+As before, we can then read it back. Again, we'll read each batch from the
+stream as it arrives, instead of reading them all into a table:
+
+.. testcode::
+
+   # Read content of the dataset
+   flight = client.get_flight_info(upload_descriptor)
+   reader = client.do_get(flight.endpoints[0].ticket)
+   total_rows = 0
+   for chunk in reader:
+       total_rows += chunk.data.num_rows
+   print("Got", total_rows, "rows total, expected", NUM_BATCHES * 
ROWS_PER_BATCH)
+
+.. testoutput::
+
+   Got 4194304 rows total, expected 4194304
+
+.. testcode::
+    :hide:
+
+    # Shutdown the server
+    server.shutdown()

Reply via email to