davisusanibar commented on a change in pull request #137:
URL: https://github.com/apache/arrow-cookbook/pull/137#discussion_r822068305



##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1699 @@
+.. _arrow-flight:
+
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Simple Service with Arrow Flight
+================================
+
+We are going to create: Flight Producer and Fligh Server:
+
+* InMemoryStore: A FlightProducer that hosts an in memory store of Arrow 
buffers. Used for integration testing.
+
+* ExampleFlightServer: An Example Flight Server that provides access to the 
InMemoryStore.
+
+Creating the Server
+*******************
+
+.. testcode::
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.ActionType;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightProducer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.flight.example.ExampleTicket;
+    import org.apache.arrow.flight.example.FlightHolder;
+    import org.apache.arrow.flight.example.Stream;
+    import org.apache.arrow.flight.example.Stream.StreamCreator;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+
+    import java.util.concurrent.ConcurrentHashMap;
+    import java.util.concurrent.ConcurrentMap;
+
+    // InMemoryStore
+
+    public class InMemoryStore implements FlightProducer, AutoCloseable {
+        private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = 
new ConcurrentHashMap<>();
+        private final BufferAllocator allocator;
+        private Location location;
+
+        public InMemoryStore(BufferAllocator allocator, Location location) {
+            super();
+            this.allocator = allocator;
+            this.location = location;
+        }
+
+        public void setLocation(Location location) {
+            this.location = location;
+        }
+
+        @Override
+        public void getStream(CallContext context, Ticket ticket,
+                              FlightProducer.ServerStreamListener listener) {
+            System.out.println("Calling to getStream");
+            getStream(ticket).sendTo(allocator, listener);
+        }
+
+        public Stream getStream(Ticket t) {
+            ExampleTicket example = ExampleTicket.from(t);
+            FlightDescriptor d = FlightDescriptor.path(example.getPath());
+            FlightHolder h = holders.get(d);
+            if (h == null) {
+                throw new IllegalStateException("Unknown ticket.");
+            }
+
+            return h.getStream(example);
+        }
+
+        @Override
+        public void listFlights(CallContext context, Criteria criteria, 
StreamListener<FlightInfo> listener) {
+            System.out.println("Calling to listFligths");
+            try {
+                for (FlightHolder h : holders.values()) {
+                    listener.onNext(h.getFlightInfo(location));
+                }
+                listener.onCompleted();
+            } catch (Exception ex) {
+                listener.onError(ex);
+            }
+        }
+
+        @Override
+        public FlightInfo getFlightInfo(CallContext context, FlightDescriptor 
descriptor) {
+            System.out.println("Calling to getFlightInfo");
+            FlightHolder h = holders.get(descriptor);
+            if (h == null) {
+                throw new IllegalStateException("Unknown descriptor.");
+            }
+
+            return h.getFlightInfo(location);
+        }
+
+        @Override
+        public Runnable acceptPut(CallContext context,
+                                  final FlightStream flightStream, final 
StreamListener<PutResult> ackStream) {
+            return () -> {
+                System.out.println("Calling to acceptPut");
+                StreamCreator creator = null;
+                boolean success = false;
+                try (VectorSchemaRoot root = flightStream.getRoot()) {
+                    final FlightHolder h = holders.computeIfAbsent(
+                            flightStream.getDescriptor(),
+                            t -> new FlightHolder(allocator, t, 
flightStream.getSchema(), flightStream.getDictionaryProvider()));
+
+                    creator = h.addStream(flightStream.getSchema());
+
+                    VectorUnloader unloader = new VectorUnloader(root);
+                    while (flightStream.next()) {
+                        
ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata()));
+                        creator.add(unloader.getRecordBatch());
+                    }
+                    // Closing the stream will release the dictionaries
+                    flightStream.takeDictionaryOwnership();
+                    creator.complete();
+                    success = true;
+                } finally {
+                    if (!success) {
+                        creator.drop();
+                    }
+                }
+
+            };
+
+        }
+
+        @Override
+        public void doAction(CallContext context, Action action,
+                             StreamListener<Result> listener) {
+            System.out.println("Calling to doAction");
+            switch (action.getType()) {
+                case "drop": {
+                    listener.onNext(new Result(new byte[0]));
+                    listener.onCompleted();
+                    break;
+                }
+                default: {
+                    
listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+                }
+            }
+        }
+
+        @Override
+        public void listActions(CallContext context,
+                                StreamListener<ActionType> listener) {
+            System.out.println("Calling to listActions");
+            listener.onNext(new ActionType("get", "pull a stream. Action must 
be done via standard get mechanism"));
+            listener.onNext(new ActionType("put", "push a stream. Action must 
be done via standard put mechanism"));
+            listener.onNext(new ActionType("drop", "delete a flight. Action 
body is a JSON encoded path."));
+            listener.onCompleted();
+        }
+
+        @Override
+        public void close() throws Exception {
+            System.out.println("Calling to close");
+            AutoCloseables.close(holders.values());
+            holders.clear();
+        }
+    }
+
+
+    // ExampleFlightServer
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    import java.io.IOException;
+
+    public class ExampleFlightServer implements AutoCloseable {
+        private final FlightServer flightServer;
+        private final Location location;
+        private final BufferAllocator allocator;
+        private final InMemoryStore mem;
+
+        /**
+         * Constructs a new instance using Allocator for allocating buffer 
storage that binds
+         * to the given location.
+         */
+        public ExampleFlightServer(BufferAllocator allocator, Location 
location) {
+            this.allocator = allocator.newChildAllocator("flight-server", 0, 
Long.MAX_VALUE);
+            this.location = location;
+            this.mem = new InMemoryStore(this.allocator, location);
+            this.flightServer = FlightServer.builder(allocator, location, 
mem).build();
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+
+        public int getPort() {
+            return this.flightServer.getPort();
+        }
+
+        public void start() throws IOException {
+            flightServer.start();
+        }
+
+        public void awaitTermination() throws InterruptedException {
+            flightServer.awaitTermination();
+        }
+
+        public InMemoryStore getStore() {
+            return mem;
+        }
+
+        @Override
+        public void close() throws Exception {
+            AutoCloseables.close(mem, flightServer, allocator);
+        }
+    }
+
+    // Creating the Server
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+    ExampleFlightServer efs = new ExampleFlightServer(allocator, 
Location.forGrpcInsecure("localhost", 33333));
+    efs.start();
+
+    System.out.println(efs.getLocation());
+
+.. testoutput::
+
+    Location{uri=grpc+tcp://localhost:33333}
+
+Creating the Client
+*******************
+
+Get List Actions
+----------------
+
+Validate lists actions available on the Flight service:
+
+.. testcode::

Review comment:
       Ticket was created. Lets join ideas for cookbook v2




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to