lidavidm commented on a change in pull request #84:
URL: https://github.com/apache/arrow-cookbook/pull/84#discussion_r726058944



##########
File path: python/source/flight.rst
##########
@@ -0,0 +1,228 @@
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Using an Arrow Flight RPC server
+================================
+
+Suppose you want to implement a service that can store, send and receive
+parquet files using the Arrow Flight protocol, 

Review comment:
       ```suggestion
   Parquet files using the Arrow Flight protocol, 
   ```

##########
File path: python/source/flight.rst
##########
@@ -0,0 +1,228 @@
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Using an Arrow Flight RPC server
+================================
+
+Suppose you want to implement a service that can store, send and receive
+parquet files using the Arrow Flight protocol, 
+``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight` 
+and particularly through the :class:`pyarrow.flight.FlightServerBase` class.
+
+.. testcode::
+
+    import pathlib
+    import threading
+
+    import pyarrow as pa
+    import pyarrow.flight
+    import pyarrow.parquet
+
+
+    class FlightServer(pa.flight.FlightServerBase):
+
+        def __init__(self, location="grpc://0.0.0.0:8815", 
+                    repo=pathlib.Path("./datasets"), **kwargs):
+            super(FlightServer, self).__init__(location, **kwargs)
+            self._location = location
+            self._repo = repo
+
+        def _make_flight_info(self, dataset):
+            dataset_path = self._repo / dataset
+            schema = pa.parquet.read_schema(dataset_path)
+            metadata = pa.parquet.read_metadata(dataset_path)
+            descriptor = pa.flight.FlightDescriptor.for_path(
+                dataset.encode('utf-8')
+            )
+            endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
+            return pyarrow.flight.FlightInfo(schema,
+                                            descriptor, 
+                                            endpoints,
+                                            metadata.num_rows, 
+                                            metadata.serialized_size)
+
+        def list_flights(self, context, criteria):
+            for dataset in self._repo.iterdir():
+                yield self._make_flight_info(dataset.name)
+
+        def get_flight_info(self, context, descriptor):
+            return self._make_flight_info(descriptor.path[0].decode('utf-8'))
+
+        def do_put(self, context, descriptor, reader, writer):
+            dataset = descriptor.path[0].decode('utf-8')
+            dataset_path = self._repo / dataset
+            data_table = reader.read_all()
+            pa.parquet.write_table(data_table, dataset_path)
+
+        def do_get(self, context, ticket):
+            dataset = ticket.ticket.decode('utf-8')
+            dataset_path = self._repo / dataset
+            return 
pa.flight.RecordBatchStream(pa.parquet.read_table(dataset_path))
+
+        def list_actions(self, context):
+            return [
+                ("drop_dataset", "Delete a dataset."),
+            ]
+
+        def do_action(self, context, action):
+            if action.type == "drop_dataset":
+                self.do_drop_dataset(action.body.to_pybytes().decode('utf-8'))
+            else:
+                raise NotImplementedError
+
+        def do_drop_dataset(self, dataset):
+            dataset_path = self._repo / dataset
+            dataset_path.unlink()
+
+The example server exposes :meth:`pyarrow.flight.FlightServerBase.list_flights`
+which is the method in charge of returning the list of data streams available
+for fetching.
+
+Likewise, :meth:`pyarrow.flight.FlightServerBase.get_flight_info` provides
+the information regarding a single specific data stream.
+
+Then we expose :meth:`pyarrow.flight.FlightServerBase.do_get` which is in 
charge
+of actually fetching the exposed data streams and sending them to the client.
+
+Allowing to list and dowload data streams would be pretty useless if we didn't
+expose a way to create them, this is the responsability of
+:meth:`pyarrow.flight.FlightServerBase.do_put` which is in charge of receiving
+new data from the client and dealing with it (in this case saving it
+into a parquet file)
+
+This are the most common Arrow Flight requests, if we need to add more
+functionalities, we can do so using custom actions.
+
+In the previous example a `drop_dataset` custom action is added. 
+All custom actions are executed through the 
+:meth:`pyarrow.flight.FlightServerBase.do_action` method, thus it's up to
+the server subclass to dispatch them properly. In this case we invoke
+the `do_drop_dataset` method when the `action.type` is the one we expect.
+
+Our server can then be started with 
+:meth:`pyarrow.flight.FlightServerBase.serve`
+
+.. code-block::
+
+    if __name__ == '__main__':
+        server = FlightServer()
+        server._repo.mkdir(exist_ok=True)
+        server.serve()
+
+.. testcode::
+    :hide:
+
+    # Code block to start for real a server in background
+    # and wait for it to be available.
+    # Previous code block is just to show to user how to start it.

Review comment:
       In that case, should we move the `import threading` from the first code 
block to here?

##########
File path: python/source/flight.rst
##########
@@ -0,0 +1,228 @@
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Using an Arrow Flight RPC server

Review comment:
       This title is a little generic for what it shows (I would have expected 
much less content/a much more minimal example), perhaps it could be called 
something like "A simple Parquet storage service with Arrow Flight"?

##########
File path: python/source/flight.rst
##########
@@ -0,0 +1,228 @@
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Using an Arrow Flight RPC server
+================================
+
+Suppose you want to implement a service that can store, send and receive
+parquet files using the Arrow Flight protocol, 
+``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight` 
+and particularly through the :class:`pyarrow.flight.FlightServerBase` class.
+
+.. testcode::
+
+    import pathlib
+    import threading
+
+    import pyarrow as pa
+    import pyarrow.flight
+    import pyarrow.parquet
+
+
+    class FlightServer(pa.flight.FlightServerBase):
+
+        def __init__(self, location="grpc://0.0.0.0:8815", 
+                    repo=pathlib.Path("./datasets"), **kwargs):
+            super(FlightServer, self).__init__(location, **kwargs)
+            self._location = location
+            self._repo = repo
+
+        def _make_flight_info(self, dataset):
+            dataset_path = self._repo / dataset
+            schema = pa.parquet.read_schema(dataset_path)
+            metadata = pa.parquet.read_metadata(dataset_path)
+            descriptor = pa.flight.FlightDescriptor.for_path(
+                dataset.encode('utf-8')
+            )
+            endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
+            return pyarrow.flight.FlightInfo(schema,
+                                            descriptor, 
+                                            endpoints,
+                                            metadata.num_rows, 
+                                            metadata.serialized_size)
+
+        def list_flights(self, context, criteria):
+            for dataset in self._repo.iterdir():
+                yield self._make_flight_info(dataset.name)
+
+        def get_flight_info(self, context, descriptor):
+            return self._make_flight_info(descriptor.path[0].decode('utf-8'))
+
+        def do_put(self, context, descriptor, reader, writer):
+            dataset = descriptor.path[0].decode('utf-8')
+            dataset_path = self._repo / dataset
+            data_table = reader.read_all()
+            pa.parquet.write_table(data_table, dataset_path)
+
+        def do_get(self, context, ticket):
+            dataset = ticket.ticket.decode('utf-8')
+            dataset_path = self._repo / dataset
+            return 
pa.flight.RecordBatchStream(pa.parquet.read_table(dataset_path))
+
+        def list_actions(self, context):
+            return [
+                ("drop_dataset", "Delete a dataset."),
+            ]
+
+        def do_action(self, context, action):
+            if action.type == "drop_dataset":
+                self.do_drop_dataset(action.body.to_pybytes().decode('utf-8'))
+            else:
+                raise NotImplementedError
+
+        def do_drop_dataset(self, dataset):
+            dataset_path = self._repo / dataset
+            dataset_path.unlink()
+
+The example server exposes :meth:`pyarrow.flight.FlightServerBase.list_flights`
+which is the method in charge of returning the list of data streams available
+for fetching.
+
+Likewise, :meth:`pyarrow.flight.FlightServerBase.get_flight_info` provides
+the information regarding a single specific data stream.
+
+Then we expose :meth:`pyarrow.flight.FlightServerBase.do_get` which is in 
charge
+of actually fetching the exposed data streams and sending them to the client.
+
+Allowing to list and dowload data streams would be pretty useless if we didn't
+expose a way to create them, this is the responsability of
+:meth:`pyarrow.flight.FlightServerBase.do_put` which is in charge of receiving
+new data from the client and dealing with it (in this case saving it
+into a parquet file)
+
+This are the most common Arrow Flight requests, if we need to add more
+functionalities, we can do so using custom actions.
+
+In the previous example a `drop_dataset` custom action is added. 

Review comment:
       ```suggestion
   In the previous example a ``drop_dataset`` custom action is added. 
   ```

##########
File path: python/source/flight.rst
##########
@@ -0,0 +1,228 @@
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Using an Arrow Flight RPC server
+================================
+
+Suppose you want to implement a service that can store, send and receive
+parquet files using the Arrow Flight protocol, 
+``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight` 
+and particularly through the :class:`pyarrow.flight.FlightServerBase` class.
+
+.. testcode::
+
+    import pathlib
+    import threading
+
+    import pyarrow as pa
+    import pyarrow.flight
+    import pyarrow.parquet
+
+
+    class FlightServer(pa.flight.FlightServerBase):
+
+        def __init__(self, location="grpc://0.0.0.0:8815", 
+                    repo=pathlib.Path("./datasets"), **kwargs):
+            super(FlightServer, self).__init__(location, **kwargs)
+            self._location = location
+            self._repo = repo
+
+        def _make_flight_info(self, dataset):
+            dataset_path = self._repo / dataset
+            schema = pa.parquet.read_schema(dataset_path)
+            metadata = pa.parquet.read_metadata(dataset_path)
+            descriptor = pa.flight.FlightDescriptor.for_path(
+                dataset.encode('utf-8')
+            )
+            endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
+            return pyarrow.flight.FlightInfo(schema,
+                                            descriptor, 
+                                            endpoints,
+                                            metadata.num_rows, 
+                                            metadata.serialized_size)
+
+        def list_flights(self, context, criteria):
+            for dataset in self._repo.iterdir():
+                yield self._make_flight_info(dataset.name)
+
+        def get_flight_info(self, context, descriptor):
+            return self._make_flight_info(descriptor.path[0].decode('utf-8'))
+
+        def do_put(self, context, descriptor, reader, writer):
+            dataset = descriptor.path[0].decode('utf-8')
+            dataset_path = self._repo / dataset
+            data_table = reader.read_all()
+            pa.parquet.write_table(data_table, dataset_path)
+
+        def do_get(self, context, ticket):
+            dataset = ticket.ticket.decode('utf-8')
+            dataset_path = self._repo / dataset
+            return 
pa.flight.RecordBatchStream(pa.parquet.read_table(dataset_path))
+
+        def list_actions(self, context):
+            return [
+                ("drop_dataset", "Delete a dataset."),
+            ]
+
+        def do_action(self, context, action):
+            if action.type == "drop_dataset":
+                self.do_drop_dataset(action.body.to_pybytes().decode('utf-8'))
+            else:
+                raise NotImplementedError
+
+        def do_drop_dataset(self, dataset):
+            dataset_path = self._repo / dataset
+            dataset_path.unlink()
+
+The example server exposes :meth:`pyarrow.flight.FlightServerBase.list_flights`
+which is the method in charge of returning the list of data streams available
+for fetching.
+
+Likewise, :meth:`pyarrow.flight.FlightServerBase.get_flight_info` provides
+the information regarding a single specific data stream.
+
+Then we expose :meth:`pyarrow.flight.FlightServerBase.do_get` which is in 
charge
+of actually fetching the exposed data streams and sending them to the client.
+
+Allowing to list and dowload data streams would be pretty useless if we didn't
+expose a way to create them, this is the responsability of
+:meth:`pyarrow.flight.FlightServerBase.do_put` which is in charge of receiving
+new data from the client and dealing with it (in this case saving it
+into a parquet file)
+
+This are the most common Arrow Flight requests, if we need to add more
+functionalities, we can do so using custom actions.

Review comment:
       Is it worth noting somewhere that if not implemented, all Flight methods 
have a default implementation? 
   Similarly, it may be worth linking to the list of all methods. (Oh, though 
the docs don't really have a good listing. That should be fixed.)

##########
File path: python/source/flight.rst
##########
@@ -0,0 +1,228 @@
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Using an Arrow Flight RPC server
+================================
+
+Suppose you want to implement a service that can store, send and receive
+parquet files using the Arrow Flight protocol, 
+``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight` 
+and particularly through the :class:`pyarrow.flight.FlightServerBase` class.
+
+.. testcode::
+
+    import pathlib
+    import threading
+
+    import pyarrow as pa
+    import pyarrow.flight
+    import pyarrow.parquet
+
+
+    class FlightServer(pa.flight.FlightServerBase):
+
+        def __init__(self, location="grpc://0.0.0.0:8815", 

Review comment:
       As an alternative to hardcoding a port, you can bind to port 0, then 
print out `server.port` after creating the server instance (before calling 
serve() - confusingly, the server is started as soon as you instantiate the 
class)

##########
File path: python/source/flight.rst
##########
@@ -0,0 +1,228 @@
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Using an Arrow Flight RPC server
+================================
+
+Suppose you want to implement a service that can store, send and receive
+parquet files using the Arrow Flight protocol, 
+``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight` 
+and particularly through the :class:`pyarrow.flight.FlightServerBase` class.
+
+.. testcode::
+
+    import pathlib
+    import threading
+
+    import pyarrow as pa
+    import pyarrow.flight
+    import pyarrow.parquet
+
+
+    class FlightServer(pa.flight.FlightServerBase):
+
+        def __init__(self, location="grpc://0.0.0.0:8815", 
+                    repo=pathlib.Path("./datasets"), **kwargs):
+            super(FlightServer, self).__init__(location, **kwargs)
+            self._location = location
+            self._repo = repo
+
+        def _make_flight_info(self, dataset):
+            dataset_path = self._repo / dataset
+            schema = pa.parquet.read_schema(dataset_path)
+            metadata = pa.parquet.read_metadata(dataset_path)
+            descriptor = pa.flight.FlightDescriptor.for_path(
+                dataset.encode('utf-8')
+            )
+            endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
+            return pyarrow.flight.FlightInfo(schema,
+                                            descriptor, 
+                                            endpoints,
+                                            metadata.num_rows, 
+                                            metadata.serialized_size)
+
+        def list_flights(self, context, criteria):
+            for dataset in self._repo.iterdir():
+                yield self._make_flight_info(dataset.name)
+
+        def get_flight_info(self, context, descriptor):
+            return self._make_flight_info(descriptor.path[0].decode('utf-8'))
+
+        def do_put(self, context, descriptor, reader, writer):
+            dataset = descriptor.path[0].decode('utf-8')
+            dataset_path = self._repo / dataset
+            data_table = reader.read_all()
+            pa.parquet.write_table(data_table, dataset_path)

Review comment:
       A follow-up example might be to do this in a streaming way to reduce 
memory pressure (ditto for do_get).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to