davisusanibar commented on a change in pull request #137: URL: https://github.com/apache/arrow-cookbook/pull/137#discussion_r808090691
########## File path: java/source/flight.rst ########## @@ -0,0 +1,1699 @@ +.. _arrow-flight: + +============ +Arrow Flight +============ + +Recipes related to leveraging Arrow Flight protocol + +.. contents:: + +Simple Service with Arrow Flight +================================ + +We are going to create: Flight Producer and Fligh Server: + +* InMemoryStore: A FlightProducer that hosts an in memory store of Arrow buffers. Used for integration testing. + +* ExampleFlightServer: An Example Flight Server that provides access to the InMemoryStore. + +Creating the Server +******************* + +.. testcode:: + + import org.apache.arrow.flight.Action; + import org.apache.arrow.flight.ActionType; + import org.apache.arrow.flight.CallStatus; + import org.apache.arrow.flight.Criteria; + import org.apache.arrow.flight.FlightDescriptor; + import org.apache.arrow.flight.FlightInfo; + import org.apache.arrow.flight.FlightProducer; + import org.apache.arrow.flight.FlightStream; + import org.apache.arrow.flight.Location; + import org.apache.arrow.flight.PutResult; + import org.apache.arrow.flight.Result; + import org.apache.arrow.flight.Ticket; + import org.apache.arrow.flight.example.ExampleTicket; + import org.apache.arrow.flight.example.FlightHolder; + import org.apache.arrow.flight.example.Stream; + import org.apache.arrow.flight.example.Stream.StreamCreator; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.util.AutoCloseables; + import org.apache.arrow.vector.VectorSchemaRoot; + import org.apache.arrow.vector.VectorUnloader; + + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + + // InMemoryStore + + public class InMemoryStore implements FlightProducer, AutoCloseable { + private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = new ConcurrentHashMap<>(); + private final BufferAllocator allocator; + private Location location; + + public InMemoryStore(BufferAllocator allocator, Location location) { + super(); + this.allocator = allocator; + this.location = location; + } + + public void setLocation(Location location) { + this.location = location; + } + + @Override + public void getStream(CallContext context, Ticket ticket, + FlightProducer.ServerStreamListener listener) { + System.out.println("Calling to getStream"); + getStream(ticket).sendTo(allocator, listener); + } + + public Stream getStream(Ticket t) { + ExampleTicket example = ExampleTicket.from(t); + FlightDescriptor d = FlightDescriptor.path(example.getPath()); + FlightHolder h = holders.get(d); + if (h == null) { + throw new IllegalStateException("Unknown ticket."); + } + + return h.getStream(example); + } + + @Override + public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) { + System.out.println("Calling to listFligths"); + try { + for (FlightHolder h : holders.values()) { + listener.onNext(h.getFlightInfo(location)); + } + listener.onCompleted(); + } catch (Exception ex) { + listener.onError(ex); + } + } + + @Override + public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { + System.out.println("Calling to getFlightInfo"); + FlightHolder h = holders.get(descriptor); + if (h == null) { + throw new IllegalStateException("Unknown descriptor."); + } + + return h.getFlightInfo(location); + } + + @Override + public Runnable acceptPut(CallContext context, + final FlightStream flightStream, final StreamListener<PutResult> ackStream) { + return () -> { + System.out.println("Calling to acceptPut"); + StreamCreator creator = null; + boolean success = false; + try (VectorSchemaRoot root = flightStream.getRoot()) { + final FlightHolder h = holders.computeIfAbsent( + flightStream.getDescriptor(), + t -> new FlightHolder(allocator, t, flightStream.getSchema(), flightStream.getDictionaryProvider())); + + creator = h.addStream(flightStream.getSchema()); + + VectorUnloader unloader = new VectorUnloader(root); + while (flightStream.next()) { + ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata())); + creator.add(unloader.getRecordBatch()); + } + // Closing the stream will release the dictionaries + flightStream.takeDictionaryOwnership(); + creator.complete(); + success = true; + } finally { + if (!success) { + creator.drop(); + } + } + + }; + + } + + @Override + public void doAction(CallContext context, Action action, + StreamListener<Result> listener) { + System.out.println("Calling to doAction"); + switch (action.getType()) { + case "drop": { + listener.onNext(new Result(new byte[0])); + listener.onCompleted(); + break; + } + default: { + listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException()); + } + } + } + + @Override + public void listActions(CallContext context, + StreamListener<ActionType> listener) { + System.out.println("Calling to listActions"); + listener.onNext(new ActionType("get", "pull a stream. Action must be done via standard get mechanism")); + listener.onNext(new ActionType("put", "push a stream. Action must be done via standard put mechanism")); + listener.onNext(new ActionType("drop", "delete a flight. Action body is a JSON encoded path.")); + listener.onCompleted(); + } + + @Override + public void close() throws Exception { + System.out.println("Calling to close"); + AutoCloseables.close(holders.values()); + holders.clear(); + } + } + + + // ExampleFlightServer + + import org.apache.arrow.flight.FlightServer; + import org.apache.arrow.flight.Location; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.util.AutoCloseables; + + import java.io.IOException; + + public class ExampleFlightServer implements AutoCloseable { + private final FlightServer flightServer; + private final Location location; + private final BufferAllocator allocator; + private final InMemoryStore mem; + + /** + * Constructs a new instance using Allocator for allocating buffer storage that binds + * to the given location. + */ + public ExampleFlightServer(BufferAllocator allocator, Location location) { + this.allocator = allocator.newChildAllocator("flight-server", 0, Long.MAX_VALUE); + this.location = location; + this.mem = new InMemoryStore(this.allocator, location); + this.flightServer = FlightServer.builder(allocator, location, mem).build(); + } + + public Location getLocation() { + return location; + } + + public int getPort() { + return this.flightServer.getPort(); + } + + public void start() throws IOException { + flightServer.start(); + } + + public void awaitTermination() throws InterruptedException { + flightServer.awaitTermination(); + } + + public InMemoryStore getStore() { + return mem; + } + + @Override + public void close() throws Exception { + AutoCloseables.close(mem, flightServer, allocator); + } + } + + // Creating the Server + + import org.apache.arrow.flight.FlightServer; + import org.apache.arrow.flight.Location; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.util.AutoCloseables; + + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + ExampleFlightServer efs = new ExampleFlightServer(allocator, Location.forGrpcInsecure("localhost", 33333)); + efs.start(); + + System.out.println(efs.getLocation()); + +.. testoutput:: + + Location{uri=grpc+tcp://localhost:33333} + +Creating the Client +******************* + +Get List Actions +---------------- + +Validate lists actions available on the Flight service: + +.. testcode:: Review comment: Adding all the code at the initial documentation and continue explaining the code. Also added a ticket to create a custom plugin to be able to run java code cookbook but only show to our user the main code: https://issues.apache.org/jira/browse/ARROW-15703 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org