lidavidm commented on a change in pull request #137:
URL: https://github.com/apache/arrow-cookbook/pull/137#discussion_r801625197



##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1699 @@
+.. _arrow-flight:
+
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Simple Service with Arrow Flight
+================================
+
+We are going to create: Flight Producer and Fligh Server:
+
+* InMemoryStore: A FlightProducer that hosts an in memory store of Arrow 
buffers. Used for integration testing.
+
+* ExampleFlightServer: An Example Flight Server that provides access to the 
InMemoryStore.

Review comment:
       Sorry, this is the opposite of what I meant: we should demonstrate 
implementing FlightProducer, but we should not use anything from 
`org.apache.arrow.flight.example`, and we don't need to copy how it creates an 
"ExampleFlightServer" class. We should just directly create "FlightServer".

##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1699 @@
+.. _arrow-flight:
+
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Simple Service with Arrow Flight
+================================
+
+We are going to create: Flight Producer and Fligh Server:
+
+* InMemoryStore: A FlightProducer that hosts an in memory store of Arrow 
buffers. Used for integration testing.
+
+* ExampleFlightServer: An Example Flight Server that provides access to the 
InMemoryStore.
+
+Creating the Server
+*******************
+
+.. testcode::
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.ActionType;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightProducer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.flight.example.ExampleTicket;
+    import org.apache.arrow.flight.example.FlightHolder;
+    import org.apache.arrow.flight.example.Stream;
+    import org.apache.arrow.flight.example.Stream.StreamCreator;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+
+    import java.util.concurrent.ConcurrentHashMap;
+    import java.util.concurrent.ConcurrentMap;
+
+    // InMemoryStore
+
+    public class InMemoryStore implements FlightProducer, AutoCloseable {
+        private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = 
new ConcurrentHashMap<>();
+        private final BufferAllocator allocator;
+        private Location location;
+
+        public InMemoryStore(BufferAllocator allocator, Location location) {
+            super();
+            this.allocator = allocator;
+            this.location = location;
+        }
+
+        public void setLocation(Location location) {
+            this.location = location;
+        }
+
+        @Override
+        public void getStream(CallContext context, Ticket ticket,
+                              FlightProducer.ServerStreamListener listener) {
+            System.out.println("Calling to getStream");
+            getStream(ticket).sendTo(allocator, listener);
+        }
+
+        public Stream getStream(Ticket t) {
+            ExampleTicket example = ExampleTicket.from(t);
+            FlightDescriptor d = FlightDescriptor.path(example.getPath());
+            FlightHolder h = holders.get(d);
+            if (h == null) {
+                throw new IllegalStateException("Unknown ticket.");
+            }
+
+            return h.getStream(example);
+        }
+
+        @Override
+        public void listFlights(CallContext context, Criteria criteria, 
StreamListener<FlightInfo> listener) {
+            System.out.println("Calling to listFligths");
+            try {
+                for (FlightHolder h : holders.values()) {
+                    listener.onNext(h.getFlightInfo(location));
+                }
+                listener.onCompleted();
+            } catch (Exception ex) {
+                listener.onError(ex);
+            }
+        }
+
+        @Override
+        public FlightInfo getFlightInfo(CallContext context, FlightDescriptor 
descriptor) {
+            System.out.println("Calling to getFlightInfo");
+            FlightHolder h = holders.get(descriptor);
+            if (h == null) {
+                throw new IllegalStateException("Unknown descriptor.");
+            }
+
+            return h.getFlightInfo(location);
+        }
+
+        @Override
+        public Runnable acceptPut(CallContext context,
+                                  final FlightStream flightStream, final 
StreamListener<PutResult> ackStream) {
+            return () -> {
+                System.out.println("Calling to acceptPut");
+                StreamCreator creator = null;
+                boolean success = false;
+                try (VectorSchemaRoot root = flightStream.getRoot()) {
+                    final FlightHolder h = holders.computeIfAbsent(
+                            flightStream.getDescriptor(),
+                            t -> new FlightHolder(allocator, t, 
flightStream.getSchema(), flightStream.getDictionaryProvider()));
+
+                    creator = h.addStream(flightStream.getSchema());
+
+                    VectorUnloader unloader = new VectorUnloader(root);
+                    while (flightStream.next()) {
+                        
ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata()));
+                        creator.add(unloader.getRecordBatch());
+                    }
+                    // Closing the stream will release the dictionaries
+                    flightStream.takeDictionaryOwnership();
+                    creator.complete();
+                    success = true;
+                } finally {
+                    if (!success) {
+                        creator.drop();
+                    }
+                }
+
+            };
+
+        }
+
+        @Override
+        public void doAction(CallContext context, Action action,
+                             StreamListener<Result> listener) {
+            System.out.println("Calling to doAction");
+            switch (action.getType()) {
+                case "drop": {
+                    listener.onNext(new Result(new byte[0]));
+                    listener.onCompleted();
+                    break;
+                }
+                default: {
+                    
listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+                }
+            }
+        }
+
+        @Override
+        public void listActions(CallContext context,
+                                StreamListener<ActionType> listener) {
+            System.out.println("Calling to listActions");
+            listener.onNext(new ActionType("get", "pull a stream. Action must 
be done via standard get mechanism"));
+            listener.onNext(new ActionType("put", "push a stream. Action must 
be done via standard put mechanism"));
+            listener.onNext(new ActionType("drop", "delete a flight. Action 
body is a JSON encoded path."));
+            listener.onCompleted();
+        }
+
+        @Override
+        public void close() throws Exception {
+            System.out.println("Calling to close");
+            AutoCloseables.close(holders.values());
+            holders.clear();
+        }
+    }
+
+
+    // ExampleFlightServer
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    import java.io.IOException;
+
+    public class ExampleFlightServer implements AutoCloseable {
+        private final FlightServer flightServer;
+        private final Location location;
+        private final BufferAllocator allocator;
+        private final InMemoryStore mem;
+
+        /**
+         * Constructs a new instance using Allocator for allocating buffer 
storage that binds
+         * to the given location.
+         */
+        public ExampleFlightServer(BufferAllocator allocator, Location 
location) {
+            this.allocator = allocator.newChildAllocator("flight-server", 0, 
Long.MAX_VALUE);
+            this.location = location;
+            this.mem = new InMemoryStore(this.allocator, location);
+            this.flightServer = FlightServer.builder(allocator, location, 
mem).build();
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+
+        public int getPort() {
+            return this.flightServer.getPort();
+        }
+
+        public void start() throws IOException {
+            flightServer.start();
+        }
+
+        public void awaitTermination() throws InterruptedException {
+            flightServer.awaitTermination();
+        }
+
+        public InMemoryStore getStore() {
+            return mem;
+        }
+
+        @Override
+        public void close() throws Exception {
+            AutoCloseables.close(mem, flightServer, allocator);
+        }
+    }
+
+    // Creating the Server
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+    ExampleFlightServer efs = new ExampleFlightServer(allocator, 
Location.forGrpcInsecure("localhost", 33333));
+    efs.start();
+
+    System.out.println(efs.getLocation());
+
+.. testoutput::
+
+    Location{uri=grpc+tcp://localhost:33333}
+
+Creating the Client
+*******************
+
+Get List Actions
+----------------
+
+Validate lists actions available on the Flight service:
+
+.. testcode::

Review comment:
       Yes, all code needs to work in isolation, but it's not very helpful if 
every example starts with a few hundred lines of copy-pasted code since then 
it's hard to see what the example is trying to demonstrate. The other language 
cookbooks don't repeat the server definition for each example, for instance. It 
might be easier to show the server once and then just use normal code blocks to 
show the client examples afterwards.

##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1699 @@
+.. _arrow-flight:
+
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Simple Service with Arrow Flight
+================================
+
+We are going to create: Flight Producer and Fligh Server:
+
+* InMemoryStore: A FlightProducer that hosts an in memory store of Arrow 
buffers. Used for integration testing.
+
+* ExampleFlightServer: An Example Flight Server that provides access to the 
InMemoryStore.
+
+Creating the Server
+*******************
+
+.. testcode::
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.ActionType;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightProducer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.flight.example.ExampleTicket;
+    import org.apache.arrow.flight.example.FlightHolder;
+    import org.apache.arrow.flight.example.Stream;
+    import org.apache.arrow.flight.example.Stream.StreamCreator;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+
+    import java.util.concurrent.ConcurrentHashMap;
+    import java.util.concurrent.ConcurrentMap;
+
+    // InMemoryStore
+
+    public class InMemoryStore implements FlightProducer, AutoCloseable {
+        private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = 
new ConcurrentHashMap<>();
+        private final BufferAllocator allocator;
+        private Location location;
+
+        public InMemoryStore(BufferAllocator allocator, Location location) {
+            super();
+            this.allocator = allocator;
+            this.location = location;
+        }
+
+        public void setLocation(Location location) {
+            this.location = location;
+        }
+
+        @Override
+        public void getStream(CallContext context, Ticket ticket,
+                              FlightProducer.ServerStreamListener listener) {
+            System.out.println("Calling to getStream");
+            getStream(ticket).sendTo(allocator, listener);
+        }
+
+        public Stream getStream(Ticket t) {
+            ExampleTicket example = ExampleTicket.from(t);
+            FlightDescriptor d = FlightDescriptor.path(example.getPath());
+            FlightHolder h = holders.get(d);
+            if (h == null) {
+                throw new IllegalStateException("Unknown ticket.");
+            }
+
+            return h.getStream(example);
+        }
+
+        @Override
+        public void listFlights(CallContext context, Criteria criteria, 
StreamListener<FlightInfo> listener) {
+            System.out.println("Calling to listFligths");
+            try {
+                for (FlightHolder h : holders.values()) {
+                    listener.onNext(h.getFlightInfo(location));
+                }
+                listener.onCompleted();
+            } catch (Exception ex) {
+                listener.onError(ex);
+            }
+        }
+
+        @Override
+        public FlightInfo getFlightInfo(CallContext context, FlightDescriptor 
descriptor) {
+            System.out.println("Calling to getFlightInfo");
+            FlightHolder h = holders.get(descriptor);
+            if (h == null) {
+                throw new IllegalStateException("Unknown descriptor.");
+            }
+
+            return h.getFlightInfo(location);
+        }
+
+        @Override
+        public Runnable acceptPut(CallContext context,
+                                  final FlightStream flightStream, final 
StreamListener<PutResult> ackStream) {
+            return () -> {
+                System.out.println("Calling to acceptPut");
+                StreamCreator creator = null;
+                boolean success = false;
+                try (VectorSchemaRoot root = flightStream.getRoot()) {
+                    final FlightHolder h = holders.computeIfAbsent(
+                            flightStream.getDescriptor(),
+                            t -> new FlightHolder(allocator, t, 
flightStream.getSchema(), flightStream.getDictionaryProvider()));
+
+                    creator = h.addStream(flightStream.getSchema());
+
+                    VectorUnloader unloader = new VectorUnloader(root);
+                    while (flightStream.next()) {
+                        
ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata()));
+                        creator.add(unloader.getRecordBatch());
+                    }
+                    // Closing the stream will release the dictionaries
+                    flightStream.takeDictionaryOwnership();
+                    creator.complete();
+                    success = true;
+                } finally {
+                    if (!success) {
+                        creator.drop();
+                    }
+                }
+
+            };
+
+        }
+
+        @Override
+        public void doAction(CallContext context, Action action,
+                             StreamListener<Result> listener) {
+            System.out.println("Calling to doAction");
+            switch (action.getType()) {
+                case "drop": {
+                    listener.onNext(new Result(new byte[0]));
+                    listener.onCompleted();
+                    break;
+                }
+                default: {
+                    
listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+                }
+            }
+        }
+
+        @Override
+        public void listActions(CallContext context,
+                                StreamListener<ActionType> listener) {
+            System.out.println("Calling to listActions");
+            listener.onNext(new ActionType("get", "pull a stream. Action must 
be done via standard get mechanism"));
+            listener.onNext(new ActionType("put", "push a stream. Action must 
be done via standard put mechanism"));
+            listener.onNext(new ActionType("drop", "delete a flight. Action 
body is a JSON encoded path."));
+            listener.onCompleted();
+        }
+
+        @Override
+        public void close() throws Exception {
+            System.out.println("Calling to close");
+            AutoCloseables.close(holders.values());
+            holders.clear();
+        }
+    }
+
+
+    // ExampleFlightServer
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    import java.io.IOException;
+
+    public class ExampleFlightServer implements AutoCloseable {
+        private final FlightServer flightServer;
+        private final Location location;
+        private final BufferAllocator allocator;
+        private final InMemoryStore mem;
+
+        /**
+         * Constructs a new instance using Allocator for allocating buffer 
storage that binds
+         * to the given location.
+         */
+        public ExampleFlightServer(BufferAllocator allocator, Location 
location) {
+            this.allocator = allocator.newChildAllocator("flight-server", 0, 
Long.MAX_VALUE);
+            this.location = location;
+            this.mem = new InMemoryStore(this.allocator, location);
+            this.flightServer = FlightServer.builder(allocator, location, 
mem).build();
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+
+        public int getPort() {
+            return this.flightServer.getPort();
+        }
+
+        public void start() throws IOException {
+            flightServer.start();
+        }
+
+        public void awaitTermination() throws InterruptedException {
+            flightServer.awaitTermination();
+        }
+
+        public InMemoryStore getStore() {
+            return mem;
+        }
+
+        @Override
+        public void close() throws Exception {
+            AutoCloseables.close(mem, flightServer, allocator);
+        }
+    }
+
+    // Creating the Server
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+    ExampleFlightServer efs = new ExampleFlightServer(allocator, 
Location.forGrpcInsecure("localhost", 33333));
+    efs.start();
+
+    System.out.println(efs.getLocation());
+
+.. testoutput::
+
+    Location{uri=grpc+tcp://localhost:33333}
+
+Creating the Client
+*******************
+
+Get List Actions
+----------------
+
+Validate lists actions available on the Flight service:
+
+.. testcode::
+
+    // create the server
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.ActionType;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightProducer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.flight.example.ExampleTicket;
+    import org.apache.arrow.flight.example.FlightHolder;
+    import org.apache.arrow.flight.example.Stream;
+    import org.apache.arrow.flight.example.Stream.StreamCreator;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+
+    import java.util.concurrent.ConcurrentHashMap;
+    import java.util.concurrent.ConcurrentMap;
+
+    // InMemoryStore
+
+    public class InMemoryStore implements FlightProducer, AutoCloseable {
+        private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = 
new ConcurrentHashMap<>();
+        private final BufferAllocator allocator;
+        private Location location;
+
+        public InMemoryStore(BufferAllocator allocator, Location location) {
+            super();
+            this.allocator = allocator;
+            this.location = location;
+        }
+
+        public void setLocation(Location location) {
+            this.location = location;
+        }
+
+        @Override
+        public void getStream(CallContext context, Ticket ticket,
+                              FlightProducer.ServerStreamListener listener) {
+            System.out.println("Calling to getStream");
+            getStream(ticket).sendTo(allocator, listener);
+        }
+
+        public Stream getStream(Ticket t) {
+            ExampleTicket example = ExampleTicket.from(t);
+            FlightDescriptor d = FlightDescriptor.path(example.getPath());
+            FlightHolder h = holders.get(d);
+            if (h == null) {
+                throw new IllegalStateException("Unknown ticket.");
+            }
+
+            return h.getStream(example);
+        }
+
+        @Override
+        public void listFlights(CallContext context, Criteria criteria, 
StreamListener<FlightInfo> listener) {
+            System.out.println("Calling to listFligths");
+            try {
+                for (FlightHolder h : holders.values()) {
+                    listener.onNext(h.getFlightInfo(location));
+                }
+                listener.onCompleted();
+            } catch (Exception ex) {
+                listener.onError(ex);
+            }
+        }
+
+        @Override
+        public FlightInfo getFlightInfo(CallContext context, FlightDescriptor 
descriptor) {
+            System.out.println("Calling to getFlightInfo");
+            FlightHolder h = holders.get(descriptor);
+            if (h == null) {
+                throw new IllegalStateException("Unknown descriptor.");
+            }
+
+            return h.getFlightInfo(location);
+        }
+
+        @Override
+        public Runnable acceptPut(CallContext context,
+                                  final FlightStream flightStream, final 
StreamListener<PutResult> ackStream) {
+            return () -> {
+                System.out.println("Calling to acceptPut");
+                StreamCreator creator = null;
+                boolean success = false;
+                try (VectorSchemaRoot root = flightStream.getRoot()) {
+                    final FlightHolder h = holders.computeIfAbsent(
+                            flightStream.getDescriptor(),
+                            t -> new FlightHolder(allocator, t, 
flightStream.getSchema(), flightStream.getDictionaryProvider()));
+
+                    creator = h.addStream(flightStream.getSchema());
+
+                    VectorUnloader unloader = new VectorUnloader(root);
+                    while (flightStream.next()) {
+                        
ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata()));
+                        creator.add(unloader.getRecordBatch());
+                    }
+                    // Closing the stream will release the dictionaries
+                    flightStream.takeDictionaryOwnership();
+                    creator.complete();
+                    success = true;
+                } finally {
+                    if (!success) {
+                        creator.drop();
+                    }
+                }
+
+            };
+
+        }
+
+        @Override
+        public void doAction(CallContext context, Action action,
+                             StreamListener<Result> listener) {
+            System.out.println("Calling to doAction");
+            switch (action.getType()) {
+                case "drop": {
+                    listener.onNext(new Result(new byte[0]));
+                    listener.onCompleted();
+                    break;
+                }
+                default: {
+                    
listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+                }
+            }
+        }
+
+        @Override
+        public void listActions(CallContext context,
+                                StreamListener<ActionType> listener) {
+            System.out.println("Calling to listActions");
+            listener.onNext(new ActionType("get", "pull a stream. Action must 
be done via standard get mechanism"));
+            listener.onNext(new ActionType("put", "push a stream. Action must 
be done via standard put mechanism"));
+            listener.onNext(new ActionType("drop", "delete a flight. Action 
body is a JSON encoded path."));
+            listener.onCompleted();
+        }
+
+        @Override
+        public void close() throws Exception {
+            System.out.println("Calling to close");
+            AutoCloseables.close(holders.values());
+            holders.clear();
+        }
+    }
+
+
+    // ExampleFlightServer
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    import java.io.IOException;
+
+    public class ExampleFlightServer implements AutoCloseable {
+        private final FlightServer flightServer;
+        private final Location location;
+        private final BufferAllocator allocator;
+        private final InMemoryStore mem;
+
+        /**
+         * Constructs a new instance using Allocator for allocating buffer 
storage that binds
+         * to the given location.
+         */
+        public ExampleFlightServer(BufferAllocator allocator, Location 
location) {
+            this.allocator = allocator.newChildAllocator("flight-server", 0, 
Long.MAX_VALUE);
+            this.location = location;
+            this.mem = new InMemoryStore(this.allocator, location);
+            this.flightServer = FlightServer.builder(allocator, location, 
mem).build();
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+
+        public int getPort() {
+            return this.flightServer.getPort();
+        }
+
+        public void start() throws IOException {
+            flightServer.start();
+        }
+
+        public void awaitTermination() throws InterruptedException {
+            flightServer.awaitTermination();
+        }
+
+        public InMemoryStore getStore() {
+            return mem;
+        }
+
+        @Override
+        public void close() throws Exception {
+            AutoCloseables.close(mem, flightServer, allocator);
+        }
+    }
+
+    // Creating the Server
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+    ExampleFlightServer efs = new ExampleFlightServer(allocator, 
Location.forGrpcInsecure("localhost", 33333));
+    efs.start();
+
+    import org.apache.arrow.flight.FlightClient;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.flight.ActionType;
+
+    // client creation
+    RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
+    FlightClient client = FlightClient.builder(rootAllocator, 
Location.forGrpcInsecure("localhost", 33333)).build();
+
+    /**
+     * Lists actions available on the Flight service.
+     */
+    import java.util.ArrayList;
+    List<String> actionTypes = new ArrayList<>();
+    for (ActionType at : client.listActions()) {
+        actionTypes.add(at.getType());
+    }
+
+    System.out.println(actionTypes);
+
+.. testoutput::
+
+    Calling to listActions
+    [get, put, drop]
+
+Get List Flights
+----------------
+
+.. testcode::
+
+    // create the server
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.ActionType;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightProducer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.flight.example.ExampleTicket;
+    import org.apache.arrow.flight.example.FlightHolder;
+    import org.apache.arrow.flight.example.Stream;
+    import org.apache.arrow.flight.example.Stream.StreamCreator;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+
+    import java.util.concurrent.ConcurrentHashMap;
+    import java.util.concurrent.ConcurrentMap;
+
+    // InMemoryStore
+
+    public class InMemoryStore implements FlightProducer, AutoCloseable {
+        private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = 
new ConcurrentHashMap<>();
+        private final BufferAllocator allocator;
+        private Location location;
+
+        public InMemoryStore(BufferAllocator allocator, Location location) {
+            super();
+            this.allocator = allocator;
+            this.location = location;
+        }
+
+        public void setLocation(Location location) {
+            this.location = location;
+        }
+
+        @Override
+        public void getStream(CallContext context, Ticket ticket,
+                              FlightProducer.ServerStreamListener listener) {
+            System.out.println("Calling to getStream");
+            getStream(ticket).sendTo(allocator, listener);
+        }
+
+        public Stream getStream(Ticket t) {
+            ExampleTicket example = ExampleTicket.from(t);
+            FlightDescriptor d = FlightDescriptor.path(example.getPath());
+            FlightHolder h = holders.get(d);
+            if (h == null) {
+                throw new IllegalStateException("Unknown ticket.");
+            }
+
+            return h.getStream(example);
+        }
+
+        @Override
+        public void listFlights(CallContext context, Criteria criteria, 
StreamListener<FlightInfo> listener) {
+            System.out.println("Calling to listFligths");
+            try {
+                for (FlightHolder h : holders.values()) {
+                    listener.onNext(h.getFlightInfo(location));
+                }
+                listener.onCompleted();
+            } catch (Exception ex) {
+                listener.onError(ex);
+            }
+        }
+
+        @Override
+        public FlightInfo getFlightInfo(CallContext context, FlightDescriptor 
descriptor) {
+            System.out.println("Calling to getFlightInfo");
+            FlightHolder h = holders.get(descriptor);
+            if (h == null) {
+                throw new IllegalStateException("Unknown descriptor.");
+            }
+
+            return h.getFlightInfo(location);
+        }
+
+        @Override
+        public Runnable acceptPut(CallContext context,
+                                  final FlightStream flightStream, final 
StreamListener<PutResult> ackStream) {
+            return () -> {
+                System.out.println("Calling to acceptPut");
+                StreamCreator creator = null;
+                boolean success = false;
+                try (VectorSchemaRoot root = flightStream.getRoot()) {
+                    final FlightHolder h = holders.computeIfAbsent(
+                            flightStream.getDescriptor(),
+                            t -> new FlightHolder(allocator, t, 
flightStream.getSchema(), flightStream.getDictionaryProvider()));
+
+                    creator = h.addStream(flightStream.getSchema());
+
+                    VectorUnloader unloader = new VectorUnloader(root);
+                    while (flightStream.next()) {
+                        
ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata()));
+                        creator.add(unloader.getRecordBatch());
+                    }
+                    // Closing the stream will release the dictionaries
+                    flightStream.takeDictionaryOwnership();
+                    creator.complete();
+                    success = true;
+                } finally {
+                    if (!success) {
+                        creator.drop();
+                    }
+                }
+
+            };
+
+        }
+
+        @Override
+        public void doAction(CallContext context, Action action,
+                             StreamListener<Result> listener) {
+            System.out.println("Calling to doAction");
+            switch (action.getType()) {
+                case "drop": {
+                    listener.onNext(new Result(new byte[0]));
+                    listener.onCompleted();
+                    break;
+                }
+                default: {
+                    
listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+                }
+            }
+        }
+
+        @Override
+        public void listActions(CallContext context,
+                                StreamListener<ActionType> listener) {
+            System.out.println("Calling to listActions");
+            listener.onNext(new ActionType("get", "pull a stream. Action must 
be done via standard get mechanism"));
+            listener.onNext(new ActionType("put", "push a stream. Action must 
be done via standard put mechanism"));
+            listener.onNext(new ActionType("drop", "delete a flight. Action 
body is a JSON encoded path."));
+            listener.onCompleted();
+        }
+
+        @Override
+        public void close() throws Exception {
+            System.out.println("Calling to close");
+            AutoCloseables.close(holders.values());
+            holders.clear();
+        }
+    }
+
+
+    // ExampleFlightServer
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    import java.io.IOException;
+
+    public class ExampleFlightServer implements AutoCloseable {
+        private final FlightServer flightServer;
+        private final Location location;
+        private final BufferAllocator allocator;
+        private final InMemoryStore mem;
+
+        /**
+         * Constructs a new instance using Allocator for allocating buffer 
storage that binds
+         * to the given location.
+         */
+        public ExampleFlightServer(BufferAllocator allocator, Location 
location) {
+            this.allocator = allocator.newChildAllocator("flight-server", 0, 
Long.MAX_VALUE);
+            this.location = location;
+            this.mem = new InMemoryStore(this.allocator, location);
+            this.flightServer = FlightServer.builder(allocator, location, 
mem).build();
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+
+        public int getPort() {
+            return this.flightServer.getPort();
+        }
+
+        public void start() throws IOException {
+            flightServer.start();
+        }
+
+        public void awaitTermination() throws InterruptedException {
+            flightServer.awaitTermination();
+        }
+
+        public InMemoryStore getStore() {
+            return mem;
+        }
+
+        @Override
+        public void close() throws Exception {
+            AutoCloseables.close(mem, flightServer, allocator);
+        }
+    }
+
+    // Creating the Server
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+    ExampleFlightServer efs = new ExampleFlightServer(allocator, 
Location.forGrpcInsecure("localhost", 33333));
+    efs.start();
+
+    import org.apache.arrow.flight.FlightClient;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.RootAllocator;
+
+    // client creation
+    RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
+    FlightClient client = FlightClient.builder(rootAllocator, 
Location.forGrpcInsecure("localhost", 33333)).build();
+
+    /**
+     * Lists flight information.
+     */
+    Iterable<FlightInfo> listFlights = client.listFlights(Criteria.ALL);
+
+    listFlights.forEach(t -> System.out.println(t));
+    System.out.println("Any list flight availale at this moment");

Review comment:
       I don't see the change here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to