amol- commented on a change in pull request #109: URL: https://github.com/apache/arrow-cookbook/pull/109#discussion_r762841563
########## File path: python/source/flight.rst ########## @@ -226,3 +226,198 @@ we might list all parquet files that are currently stored by the server: # Shutdown the server server.shutdown() +Streaming Parquet Storage Service +================================= + +We can improve the Parquet storage service and avoid holding entire datasets in +memory by streaming data. Flight readers and writers, like others in PyArrow, +can be iterated through, so let's update the server from before to take +advantage of this: + +.. testcode:: + + import pathlib + + import numpy as np + import pyarrow as pa + import pyarrow.flight + import pyarrow.parquet + + + class FlightServer(pa.flight.FlightServerBase): + + def __init__(self, location="grpc://0.0.0.0:8815", + repo=pathlib.Path("./datasets"), **kwargs): + super(FlightServer, self).__init__(location, **kwargs) + self._location = location + self._repo = repo + + def _make_flight_info(self, dataset): + dataset_path = self._repo / dataset + schema = pa.parquet.read_schema(dataset_path) + metadata = pa.parquet.read_metadata(dataset_path) + descriptor = pa.flight.FlightDescriptor.for_path( + dataset.encode('utf-8') + ) + endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])] + return pyarrow.flight.FlightInfo(schema, + descriptor, + endpoints, + metadata.num_rows, + metadata.serialized_size) + + def list_flights(self, context, criteria): + for dataset in self._repo.iterdir(): + yield self._make_flight_info(dataset.name) + + def get_flight_info(self, context, descriptor): + return self._make_flight_info(descriptor.path[0].decode('utf-8')) + + def do_put(self, context, descriptor, reader, writer): + dataset = descriptor.path[0].decode('utf-8') + dataset_path = self._repo / dataset + # Read the uploaded data and write to Parquet incrementally + with dataset_path.open("wb") as sink: + with pa.parquet.ParquetWriter(sink, reader.schema) as writer: + for chunk in reader: + writer.write_table(pa.Table.from_batches([chunk.data])) + + def do_get(self, context, ticket): + dataset = ticket.ticket.decode('utf-8') + + # Stream data from a generator we implement + if dataset == ":random:": Review comment: iI think that the `:random:` special dataset might be confusing for a reader in the context of this recipe. It's great for playing around, but the recipe goal was to read back datasets you stored, So a reader might wonder what's the purpose of that code block and get confused by it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org