lidavidm commented on a change in pull request #137:
URL: https://github.com/apache/arrow-cookbook/pull/137#discussion_r818654930



##########
File path: java/source/demo/pom.xml
##########
@@ -32,14 +32,29 @@
         </dependency>
         <dependency>
             <groupId>org.apache.arrow</groupId>
-            <artifactId>arrow-memory-unsafe</artifactId>
+            <artifactId>arrow-memory-netty</artifactId>
             <version>${arrow.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.arrow</groupId>
             <artifactId>arrow-algorithm</artifactId>
             <version>${arrow.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>flight-core</artifactId>
+            <version>6.0.0</version>

Review comment:
       6.0.0?

##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1050 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+============
+Arrow Flight
+============
+
+This section contains a number of recipes for working with Arrow Flight.
+For more detail about Flight please take a look at `Arrow Flight RPC`_.
+
+.. contents::
+
+Simple Key-Value Storage Service with Arrow Flight
+==================================================
+
+We'll implement a service that provides a key-value store for data, using 
Flight to handle uploads/requests
+and data in memory to store the actual data.
+
+Flight Client and Server
+************************
+
+.. code-block:: java
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.AsyncPutListener;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightClient;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightEndpoint;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.NoOpFlightProducer;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.vector.VarCharVector;
+    import org.apache.arrow.vector.VectorLoader;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+    import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+    import org.apache.arrow.vector.types.pojo.ArrowType;
+    import org.apache.arrow.vector.types.pojo.Field;
+    import org.apache.arrow.vector.types.pojo.FieldType;
+    import org.apache.arrow.vector.types.pojo.Schema;
+
+    import java.io.IOException;
+    import java.nio.charset.StandardCharsets;
+    import java.util.ArrayList;
+    import java.util.Arrays;
+    import java.util.Collections;
+    import java.util.Iterator;
+    import java.util.List;
+    import java.util.Map;
+    import java.util.concurrent.ConcurrentHashMap;
+
+    class DataInMemory {
+        private List<ArrowRecordBatch> listArrowRecordBatch;
+        private Schema schema;
+        private long rows;
+        public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, 
Schema schema, long rows) {
+            this.listArrowRecordBatch = listArrowRecordBatch;
+            this.schema = schema;
+            this.rows = rows;
+        }
+        public List<ArrowRecordBatch> getListArrowRecordBatch() {
+            return listArrowRecordBatch;
+        }
+        public Schema getSchema() {
+            return schema;
+        }
+        public long getRows() {
+            return rows;
+        }
+    }
+
+    // Server
+    Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
+    Map<FlightDescriptor, DataInMemory> dataInMemory = new 
ConcurrentHashMap<>();
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
+        class CookbookProducer extends NoOpFlightProducer {

Review comment:
       Can't we move this up alongside DataInMemory? (We can add a constructor 
to give it any variables needed.)

##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1050 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+============
+Arrow Flight
+============
+
+This section contains a number of recipes for working with Arrow Flight.
+For more detail about Flight please take a look at `Arrow Flight RPC`_.
+
+.. contents::
+
+Simple Key-Value Storage Service with Arrow Flight
+==================================================
+
+We'll implement a service that provides a key-value store for data, using 
Flight to handle uploads/requests
+and data in memory to store the actual data.
+
+Flight Client and Server
+************************
+
+.. code-block:: java
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.AsyncPutListener;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightClient;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightEndpoint;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.NoOpFlightProducer;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.vector.VarCharVector;
+    import org.apache.arrow.vector.VectorLoader;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+    import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+    import org.apache.arrow.vector.types.pojo.ArrowType;
+    import org.apache.arrow.vector.types.pojo.Field;
+    import org.apache.arrow.vector.types.pojo.FieldType;
+    import org.apache.arrow.vector.types.pojo.Schema;
+
+    import java.io.IOException;
+    import java.nio.charset.StandardCharsets;
+    import java.util.ArrayList;
+    import java.util.Arrays;
+    import java.util.Collections;
+    import java.util.Iterator;
+    import java.util.List;
+    import java.util.Map;
+    import java.util.concurrent.ConcurrentHashMap;
+
+    class DataInMemory {
+        private List<ArrowRecordBatch> listArrowRecordBatch;
+        private Schema schema;
+        private long rows;
+        public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, 
Schema schema, long rows) {
+            this.listArrowRecordBatch = listArrowRecordBatch;
+            this.schema = schema;
+            this.rows = rows;
+        }
+        public List<ArrowRecordBatch> getListArrowRecordBatch() {
+            return listArrowRecordBatch;
+        }
+        public Schema getSchema() {
+            return schema;
+        }
+        public long getRows() {
+            return rows;
+        }
+    }
+
+    // Server
+    Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
+    Map<FlightDescriptor, DataInMemory> dataInMemory = new 
ConcurrentHashMap<>();
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
+        class CookbookProducer extends NoOpFlightProducer {
+            @Override
+            public Runnable acceptPut(CallContext context, FlightStream 
flightStream, StreamListener<PutResult> ackStream) {
+                List<ArrowRecordBatch> listArrowRecordBatch = new 
ArrayList<>();
+                return () -> {
+                    long rows = 0;
+                    while (flightStream.next()) {
+                        VectorUnloader unloader = new 
VectorUnloader(flightStream.getRoot());
+                        try (final ArrowRecordBatch arb = 
unloader.getRecordBatch()) {
+                            listArrowRecordBatch.add(arb);
+                            rows += flightStream.getRoot().getRowCount();
+                        }
+                    }
+                    DataInMemory pojoFlightDataInMemory = new 
DataInMemory(listArrowRecordBatch, flightStream.getSchema(), rows);
+                    dataInMemory.put(flightStream.getDescriptor(), 
pojoFlightDataInMemory);
+                    ackStream.onCompleted();
+                };
+            }
+
+            @Override
+            public void getStream(CallContext context, Ticket ticket, 
ServerStreamListener listener) {
+                FlightDescriptor flightDescriptor = FlightDescriptor.path(new 
String(ticket.getBytes(), StandardCharsets.UTF_8)); // Recover data for key 
configured
+                if(dataInMemory.containsKey(flightDescriptor)){
+                    VectorSchemaRoot vectorSchemaRoot = 
VectorSchemaRoot.create(dataInMemory.get(flightDescriptor).getSchema(), 
allocator);
+                    listener.start(vectorSchemaRoot);
+                    for(ArrowRecordBatch arrowRecordBatch : 
dataInMemory.get(flightDescriptor).getListArrowRecordBatch()){
+                        VectorLoader loader = new 
VectorLoader(vectorSchemaRoot);
+                        
loader.load(arrowRecordBatch.cloneWithTransfer(allocator));
+                        listener.putNext();
+                    }
+                    listener.completed();
+                }
+            }
+
+            @Override
+            public void doAction(CallContext context, Action action, 
StreamListener<Result> listener) {
+                FlightDescriptor flightDescriptor = FlightDescriptor.path(new 
String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for key 
configured
+                if(dataInMemory.containsKey(flightDescriptor)) {
+                    switch (action.getType()) {
+                        case "DELETE":
+                            dataInMemory.remove(flightDescriptor);
+                            Result result = new Result("Delete 
completed".getBytes(StandardCharsets.UTF_8));
+                            listener.onNext(result);
+                    }
+                    listener.onCompleted();
+                }
+            }
+
+            @Override
+            public FlightInfo getFlightInfo(CallContext context, 
FlightDescriptor descriptor) {
+                if(!dataInMemory.containsKey(descriptor)){

Review comment:
       Similarly we should get the value and handle the error if it doesn't 
exist.

##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1699 @@
+.. _arrow-flight:
+
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Simple Service with Arrow Flight
+================================
+
+We are going to create: Flight Producer and Fligh Server:
+
+* InMemoryStore: A FlightProducer that hosts an in memory store of Arrow 
buffers. Used for integration testing.
+
+* ExampleFlightServer: An Example Flight Server that provides access to the 
InMemoryStore.
+
+Creating the Server
+*******************
+
+.. testcode::
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.ActionType;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightProducer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.flight.example.ExampleTicket;
+    import org.apache.arrow.flight.example.FlightHolder;
+    import org.apache.arrow.flight.example.Stream;
+    import org.apache.arrow.flight.example.Stream.StreamCreator;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+
+    import java.util.concurrent.ConcurrentHashMap;
+    import java.util.concurrent.ConcurrentMap;
+
+    // InMemoryStore
+
+    public class InMemoryStore implements FlightProducer, AutoCloseable {
+        private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = 
new ConcurrentHashMap<>();
+        private final BufferAllocator allocator;
+        private Location location;
+
+        public InMemoryStore(BufferAllocator allocator, Location location) {
+            super();
+            this.allocator = allocator;
+            this.location = location;
+        }
+
+        public void setLocation(Location location) {
+            this.location = location;
+        }
+
+        @Override
+        public void getStream(CallContext context, Ticket ticket,
+                              FlightProducer.ServerStreamListener listener) {
+            System.out.println("Calling to getStream");
+            getStream(ticket).sendTo(allocator, listener);
+        }
+
+        public Stream getStream(Ticket t) {
+            ExampleTicket example = ExampleTicket.from(t);
+            FlightDescriptor d = FlightDescriptor.path(example.getPath());
+            FlightHolder h = holders.get(d);
+            if (h == null) {
+                throw new IllegalStateException("Unknown ticket.");
+            }
+
+            return h.getStream(example);
+        }
+
+        @Override
+        public void listFlights(CallContext context, Criteria criteria, 
StreamListener<FlightInfo> listener) {
+            System.out.println("Calling to listFligths");
+            try {
+                for (FlightHolder h : holders.values()) {
+                    listener.onNext(h.getFlightInfo(location));
+                }
+                listener.onCompleted();
+            } catch (Exception ex) {
+                listener.onError(ex);
+            }
+        }
+
+        @Override
+        public FlightInfo getFlightInfo(CallContext context, FlightDescriptor 
descriptor) {
+            System.out.println("Calling to getFlightInfo");
+            FlightHolder h = holders.get(descriptor);
+            if (h == null) {
+                throw new IllegalStateException("Unknown descriptor.");
+            }
+
+            return h.getFlightInfo(location);
+        }
+
+        @Override
+        public Runnable acceptPut(CallContext context,
+                                  final FlightStream flightStream, final 
StreamListener<PutResult> ackStream) {
+            return () -> {
+                System.out.println("Calling to acceptPut");
+                StreamCreator creator = null;
+                boolean success = false;
+                try (VectorSchemaRoot root = flightStream.getRoot()) {
+                    final FlightHolder h = holders.computeIfAbsent(
+                            flightStream.getDescriptor(),
+                            t -> new FlightHolder(allocator, t, 
flightStream.getSchema(), flightStream.getDictionaryProvider()));
+
+                    creator = h.addStream(flightStream.getSchema());
+
+                    VectorUnloader unloader = new VectorUnloader(root);
+                    while (flightStream.next()) {
+                        
ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata()));
+                        creator.add(unloader.getRecordBatch());
+                    }
+                    // Closing the stream will release the dictionaries
+                    flightStream.takeDictionaryOwnership();
+                    creator.complete();
+                    success = true;
+                } finally {
+                    if (!success) {
+                        creator.drop();
+                    }
+                }
+
+            };
+
+        }
+
+        @Override
+        public void doAction(CallContext context, Action action,
+                             StreamListener<Result> listener) {
+            System.out.println("Calling to doAction");
+            switch (action.getType()) {
+                case "drop": {
+                    listener.onNext(new Result(new byte[0]));
+                    listener.onCompleted();
+                    break;
+                }
+                default: {
+                    
listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+                }
+            }
+        }
+
+        @Override
+        public void listActions(CallContext context,
+                                StreamListener<ActionType> listener) {
+            System.out.println("Calling to listActions");
+            listener.onNext(new ActionType("get", "pull a stream. Action must 
be done via standard get mechanism"));
+            listener.onNext(new ActionType("put", "push a stream. Action must 
be done via standard put mechanism"));
+            listener.onNext(new ActionType("drop", "delete a flight. Action 
body is a JSON encoded path."));
+            listener.onCompleted();
+        }
+
+        @Override
+        public void close() throws Exception {
+            System.out.println("Calling to close");
+            AutoCloseables.close(holders.values());
+            holders.clear();
+        }
+    }
+
+
+    // ExampleFlightServer
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    import java.io.IOException;
+
+    public class ExampleFlightServer implements AutoCloseable {
+        private final FlightServer flightServer;
+        private final Location location;
+        private final BufferAllocator allocator;
+        private final InMemoryStore mem;
+
+        /**
+         * Constructs a new instance using Allocator for allocating buffer 
storage that binds
+         * to the given location.
+         */
+        public ExampleFlightServer(BufferAllocator allocator, Location 
location) {
+            this.allocator = allocator.newChildAllocator("flight-server", 0, 
Long.MAX_VALUE);
+            this.location = location;
+            this.mem = new InMemoryStore(this.allocator, location);
+            this.flightServer = FlightServer.builder(allocator, location, 
mem).build();
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+
+        public int getPort() {
+            return this.flightServer.getPort();
+        }
+
+        public void start() throws IOException {
+            flightServer.start();
+        }
+
+        public void awaitTermination() throws InterruptedException {
+            flightServer.awaitTermination();
+        }
+
+        public InMemoryStore getStore() {
+            return mem;
+        }
+
+        @Override
+        public void close() throws Exception {
+            AutoCloseables.close(mem, flightServer, allocator);
+        }
+    }
+
+    // Creating the Server
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+    ExampleFlightServer efs = new ExampleFlightServer(allocator, 
Location.forGrpcInsecure("localhost", 33333));
+    efs.start();
+
+    System.out.println(efs.getLocation());
+
+.. testoutput::
+
+    Location{uri=grpc+tcp://localhost:33333}
+
+Creating the Client
+*******************
+
+Get List Actions
+----------------
+
+Validate lists actions available on the Flight service:
+
+.. testcode::

Review comment:
       FWIW because we show the entire server up front initially I think down 
below, we can just use normal code blocks to show snippets of the server 
instead.

##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1050 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+============
+Arrow Flight
+============
+
+This section contains a number of recipes for working with Arrow Flight.
+For more detail about Flight please take a look at `Arrow Flight RPC`_.
+
+.. contents::
+
+Simple Key-Value Storage Service with Arrow Flight
+==================================================
+
+We'll implement a service that provides a key-value store for data, using 
Flight to handle uploads/requests
+and data in memory to store the actual data.
+
+Flight Client and Server
+************************
+
+.. code-block:: java
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.AsyncPutListener;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightClient;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightEndpoint;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.NoOpFlightProducer;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.vector.VarCharVector;
+    import org.apache.arrow.vector.VectorLoader;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+    import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+    import org.apache.arrow.vector.types.pojo.ArrowType;
+    import org.apache.arrow.vector.types.pojo.Field;
+    import org.apache.arrow.vector.types.pojo.FieldType;
+    import org.apache.arrow.vector.types.pojo.Schema;
+
+    import java.io.IOException;
+    import java.nio.charset.StandardCharsets;
+    import java.util.ArrayList;
+    import java.util.Arrays;
+    import java.util.Collections;
+    import java.util.Iterator;
+    import java.util.List;
+    import java.util.Map;
+    import java.util.concurrent.ConcurrentHashMap;
+
+    class DataInMemory {
+        private List<ArrowRecordBatch> listArrowRecordBatch;
+        private Schema schema;
+        private long rows;
+        public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, 
Schema schema, long rows) {
+            this.listArrowRecordBatch = listArrowRecordBatch;
+            this.schema = schema;
+            this.rows = rows;
+        }
+        public List<ArrowRecordBatch> getListArrowRecordBatch() {
+            return listArrowRecordBatch;
+        }
+        public Schema getSchema() {
+            return schema;
+        }
+        public long getRows() {
+            return rows;
+        }
+    }
+
+    // Server
+    Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
+    Map<FlightDescriptor, DataInMemory> dataInMemory = new 
ConcurrentHashMap<>();

Review comment:
       The interface here should be ConcurrentMap.

##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1061 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+============
+Arrow Flight
+============
+
+This section contains a number of recipes for working with Arrow
+Flight. For moreb about Flight.
+
+.. contents::
+
+Simple VectorSchemaRoot storage service with Arrow Flight
+=========================================================
+
+We'll implement a service to transfer batches flow through VectorSchemaRoot.
+
+Flight Client and Server
+************************
+
+.. code-block:: java
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.AsyncPutListener;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightClient;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightEndpoint;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.NoOpFlightProducer;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.vector.VarCharVector;
+    import org.apache.arrow.vector.VectorLoader;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+    import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+    import org.apache.arrow.vector.types.pojo.ArrowType;
+    import org.apache.arrow.vector.types.pojo.Field;
+    import org.apache.arrow.vector.types.pojo.FieldType;
+    import org.apache.arrow.vector.types.pojo.Schema;
+
+    import java.io.IOException;
+    import java.nio.charset.StandardCharsets;
+    import java.util.ArrayList;
+    import java.util.Arrays;
+    import java.util.Collections;
+    import java.util.HashMap;
+    import java.util.Iterator;
+    import java.util.List;
+    import java.util.Map;
+
+    class DataInMemory {
+        private List<ArrowRecordBatch> listArrowRecordBatch;
+        private Schema schema;
+        private Long rows;
+        public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, 
Schema schema, Long rows) {
+            this.listArrowRecordBatch = listArrowRecordBatch;
+            this.schema = schema;
+            this.rows = rows;
+        }
+        public List<ArrowRecordBatch> getListArrowRecordBatch() {
+            return listArrowRecordBatch;
+        }
+        public Schema getSchema() {
+            return schema;
+        }
+        public Long getRows() {
+            return rows;
+        }
+    }
+
+    // Server
+    Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
+    Map<FlightDescriptor, DataInMemory> dataInMemory = new HashMap<>();
+    Map<String, DataInMemory> mapPojoFlightDataInMemory = new HashMap<>();
+    List<ArrowRecordBatch> listArrowRecordBatch = new ArrayList<>();
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
+        FlightServer flightServer = FlightServer.builder(allocator, location, 
new NoOpFlightProducer(){
+            @Override
+            public Runnable acceptPut(CallContext context, FlightStream 
flightStream, StreamListener<PutResult> ackStream) {
+                return () -> {
+                    long rows = 0;
+                    while (flightStream.next()) {
+                        VectorUnloader unloader = new 
VectorUnloader(flightStream.getRoot());
+                        try (final ArrowRecordBatch arb = 
unloader.getRecordBatch()) {
+                            // Retain data information
+                            listArrowRecordBatch.add(arb);
+                            rows = rows + flightStream.getRoot().getRowCount();
+                        }
+                    }
+                    long finalRows = rows;
+                    DataInMemory pojoFlightDataInMemory = new 
DataInMemory(listArrowRecordBatch, flightStream.getSchema(), finalRows);
+                    dataInMemory.put(flightStream.getDescriptor(), 
pojoFlightDataInMemory);
+                    ackStream.onCompleted();
+                };
+            }
+
+            @Override
+            public void getStream(CallContext context, Ticket ticket, 
ServerStreamListener listener) {
+                FlightDescriptor flightDescriptor = FlightDescriptor.path(new 
String(ticket.getBytes(), StandardCharsets.UTF_8)); // Recover data for key 
configured
+                if(dataInMemory.containsKey(flightDescriptor)){
+                    VectorSchemaRoot vectorSchemaRoot = 
VectorSchemaRoot.create(dataInMemory.get(flightDescriptor).getSchema(), 
allocator);
+                    listener.start(vectorSchemaRoot);
+                    for(ArrowRecordBatch arrowRecordBatch : 
dataInMemory.get(flightDescriptor).getListArrowRecordBatch()){
+                        vectorSchemaRoot.allocateNew();
+                        VectorLoader loader = new 
VectorLoader(vectorSchemaRoot);
+                        
loader.load(arrowRecordBatch.cloneWithTransfer(allocator));
+                        listener.putNext();
+                    }
+                    vectorSchemaRoot.clear();
+                    listener.completed();
+                }
+            }
+
+            @Override
+            public void doAction(CallContext context, Action action, 
StreamListener<Result> listener) {
+                FlightDescriptor flightDescriptor = FlightDescriptor.path(new 
String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for key 
configured
+                if(dataInMemory.containsKey(flightDescriptor)) {

Review comment:
       Wouldn't the right way to handle this be remove and catch the error? 
Since another thread may remove it concurrently, we shouldn't be checking 
before removing.

##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1699 @@
+.. _arrow-flight:
+
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Simple Service with Arrow Flight
+================================
+
+We are going to create: Flight Producer and Fligh Server:
+
+* InMemoryStore: A FlightProducer that hosts an in memory store of Arrow 
buffers. Used for integration testing.
+
+* ExampleFlightServer: An Example Flight Server that provides access to the 
InMemoryStore.
+
+Creating the Server
+*******************
+
+.. testcode::
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.ActionType;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightProducer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.flight.example.ExampleTicket;
+    import org.apache.arrow.flight.example.FlightHolder;
+    import org.apache.arrow.flight.example.Stream;
+    import org.apache.arrow.flight.example.Stream.StreamCreator;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+
+    import java.util.concurrent.ConcurrentHashMap;
+    import java.util.concurrent.ConcurrentMap;
+
+    // InMemoryStore
+
+    public class InMemoryStore implements FlightProducer, AutoCloseable {
+        private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = 
new ConcurrentHashMap<>();
+        private final BufferAllocator allocator;
+        private Location location;
+
+        public InMemoryStore(BufferAllocator allocator, Location location) {
+            super();
+            this.allocator = allocator;
+            this.location = location;
+        }
+
+        public void setLocation(Location location) {
+            this.location = location;
+        }
+
+        @Override
+        public void getStream(CallContext context, Ticket ticket,
+                              FlightProducer.ServerStreamListener listener) {
+            System.out.println("Calling to getStream");
+            getStream(ticket).sendTo(allocator, listener);
+        }
+
+        public Stream getStream(Ticket t) {
+            ExampleTicket example = ExampleTicket.from(t);
+            FlightDescriptor d = FlightDescriptor.path(example.getPath());
+            FlightHolder h = holders.get(d);
+            if (h == null) {
+                throw new IllegalStateException("Unknown ticket.");
+            }
+
+            return h.getStream(example);
+        }
+
+        @Override
+        public void listFlights(CallContext context, Criteria criteria, 
StreamListener<FlightInfo> listener) {
+            System.out.println("Calling to listFligths");
+            try {
+                for (FlightHolder h : holders.values()) {
+                    listener.onNext(h.getFlightInfo(location));
+                }
+                listener.onCompleted();
+            } catch (Exception ex) {
+                listener.onError(ex);
+            }
+        }
+
+        @Override
+        public FlightInfo getFlightInfo(CallContext context, FlightDescriptor 
descriptor) {
+            System.out.println("Calling to getFlightInfo");
+            FlightHolder h = holders.get(descriptor);
+            if (h == null) {
+                throw new IllegalStateException("Unknown descriptor.");
+            }
+
+            return h.getFlightInfo(location);
+        }
+
+        @Override
+        public Runnable acceptPut(CallContext context,
+                                  final FlightStream flightStream, final 
StreamListener<PutResult> ackStream) {
+            return () -> {
+                System.out.println("Calling to acceptPut");
+                StreamCreator creator = null;
+                boolean success = false;
+                try (VectorSchemaRoot root = flightStream.getRoot()) {
+                    final FlightHolder h = holders.computeIfAbsent(
+                            flightStream.getDescriptor(),
+                            t -> new FlightHolder(allocator, t, 
flightStream.getSchema(), flightStream.getDictionaryProvider()));
+
+                    creator = h.addStream(flightStream.getSchema());
+
+                    VectorUnloader unloader = new VectorUnloader(root);
+                    while (flightStream.next()) {
+                        
ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata()));
+                        creator.add(unloader.getRecordBatch());
+                    }
+                    // Closing the stream will release the dictionaries
+                    flightStream.takeDictionaryOwnership();
+                    creator.complete();
+                    success = true;
+                } finally {
+                    if (!success) {
+                        creator.drop();
+                    }
+                }
+
+            };
+
+        }
+
+        @Override
+        public void doAction(CallContext context, Action action,
+                             StreamListener<Result> listener) {
+            System.out.println("Calling to doAction");
+            switch (action.getType()) {
+                case "drop": {
+                    listener.onNext(new Result(new byte[0]));
+                    listener.onCompleted();
+                    break;
+                }
+                default: {
+                    
listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+                }
+            }
+        }
+
+        @Override
+        public void listActions(CallContext context,
+                                StreamListener<ActionType> listener) {
+            System.out.println("Calling to listActions");
+            listener.onNext(new ActionType("get", "pull a stream. Action must 
be done via standard get mechanism"));
+            listener.onNext(new ActionType("put", "push a stream. Action must 
be done via standard put mechanism"));
+            listener.onNext(new ActionType("drop", "delete a flight. Action 
body is a JSON encoded path."));
+            listener.onCompleted();
+        }
+
+        @Override
+        public void close() throws Exception {
+            System.out.println("Calling to close");
+            AutoCloseables.close(holders.values());
+            holders.clear();
+        }
+    }
+
+
+    // ExampleFlightServer
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    import java.io.IOException;
+
+    public class ExampleFlightServer implements AutoCloseable {
+        private final FlightServer flightServer;
+        private final Location location;
+        private final BufferAllocator allocator;
+        private final InMemoryStore mem;
+
+        /**
+         * Constructs a new instance using Allocator for allocating buffer 
storage that binds
+         * to the given location.
+         */
+        public ExampleFlightServer(BufferAllocator allocator, Location 
location) {
+            this.allocator = allocator.newChildAllocator("flight-server", 0, 
Long.MAX_VALUE);
+            this.location = location;
+            this.mem = new InMemoryStore(this.allocator, location);
+            this.flightServer = FlightServer.builder(allocator, location, 
mem).build();
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+
+        public int getPort() {
+            return this.flightServer.getPort();
+        }
+
+        public void start() throws IOException {
+            flightServer.start();
+        }
+
+        public void awaitTermination() throws InterruptedException {
+            flightServer.awaitTermination();
+        }
+
+        public InMemoryStore getStore() {
+            return mem;
+        }
+
+        @Override
+        public void close() throws Exception {
+            AutoCloseables.close(mem, flightServer, allocator);
+        }
+    }
+
+    // Creating the Server
+
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
+
+    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+    ExampleFlightServer efs = new ExampleFlightServer(allocator, 
Location.forGrpcInsecure("localhost", 33333));
+    efs.start();
+
+    System.out.println(efs.getLocation());
+
+.. testoutput::
+
+    Location{uri=grpc+tcp://localhost:33333}
+
+Creating the Client
+*******************
+
+Get List Actions
+----------------
+
+Validate lists actions available on the Flight service:
+
+.. testcode::

Review comment:
       Also I think what would help (for this, and for formatting the code) is 
to allow this directive to accept a file, and a line range, then it will be 
easier to handle long examples like this.

##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1061 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+============
+Arrow Flight
+============
+
+This section contains a number of recipes for working with Arrow
+Flight. For moreb about Flight.
+
+.. contents::
+
+Simple VectorSchemaRoot storage service with Arrow Flight
+=========================================================
+
+We'll implement a service to transfer batches flow through VectorSchemaRoot.
+
+Flight Client and Server
+************************
+
+.. code-block:: java
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.AsyncPutListener;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightClient;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightEndpoint;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.NoOpFlightProducer;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.vector.VarCharVector;
+    import org.apache.arrow.vector.VectorLoader;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+    import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+    import org.apache.arrow.vector.types.pojo.ArrowType;
+    import org.apache.arrow.vector.types.pojo.Field;
+    import org.apache.arrow.vector.types.pojo.FieldType;
+    import org.apache.arrow.vector.types.pojo.Schema;
+
+    import java.io.IOException;
+    import java.nio.charset.StandardCharsets;
+    import java.util.ArrayList;
+    import java.util.Arrays;
+    import java.util.Collections;
+    import java.util.HashMap;
+    import java.util.Iterator;
+    import java.util.List;
+    import java.util.Map;
+
+    class DataInMemory {
+        private List<ArrowRecordBatch> listArrowRecordBatch;
+        private Schema schema;
+        private Long rows;
+        public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, 
Schema schema, Long rows) {
+            this.listArrowRecordBatch = listArrowRecordBatch;
+            this.schema = schema;
+            this.rows = rows;
+        }
+        public List<ArrowRecordBatch> getListArrowRecordBatch() {
+            return listArrowRecordBatch;
+        }
+        public Schema getSchema() {
+            return schema;
+        }
+        public Long getRows() {
+            return rows;
+        }
+    }
+
+    // Server
+    Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
+    Map<FlightDescriptor, DataInMemory> dataInMemory = new HashMap<>();
+    Map<String, DataInMemory> mapPojoFlightDataInMemory = new HashMap<>();
+    List<ArrowRecordBatch> listArrowRecordBatch = new ArrayList<>();
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
+        FlightServer flightServer = FlightServer.builder(allocator, location, 
new NoOpFlightProducer(){
+            @Override
+            public Runnable acceptPut(CallContext context, FlightStream 
flightStream, StreamListener<PutResult> ackStream) {
+                return () -> {
+                    long rows = 0;
+                    while (flightStream.next()) {
+                        VectorUnloader unloader = new 
VectorUnloader(flightStream.getRoot());
+                        try (final ArrowRecordBatch arb = 
unloader.getRecordBatch()) {
+                            // Retain data information
+                            listArrowRecordBatch.add(arb);
+                            rows = rows + flightStream.getRoot().getRowCount();
+                        }
+                    }
+                    long finalRows = rows;
+                    DataInMemory pojoFlightDataInMemory = new 
DataInMemory(listArrowRecordBatch, flightStream.getSchema(), finalRows);
+                    dataInMemory.put(flightStream.getDescriptor(), 
pojoFlightDataInMemory);
+                    ackStream.onCompleted();
+                };
+            }
+
+            @Override
+            public void getStream(CallContext context, Ticket ticket, 
ServerStreamListener listener) {
+                FlightDescriptor flightDescriptor = FlightDescriptor.path(new 
String(ticket.getBytes(), StandardCharsets.UTF_8)); // Recover data for key 
configured
+                if(dataInMemory.containsKey(flightDescriptor)){
+                    VectorSchemaRoot vectorSchemaRoot = 
VectorSchemaRoot.create(dataInMemory.get(flightDescriptor).getSchema(), 
allocator);
+                    listener.start(vectorSchemaRoot);
+                    for(ArrowRecordBatch arrowRecordBatch : 
dataInMemory.get(flightDescriptor).getListArrowRecordBatch()){
+                        vectorSchemaRoot.allocateNew();
+                        VectorLoader loader = new 
VectorLoader(vectorSchemaRoot);
+                        
loader.load(arrowRecordBatch.cloneWithTransfer(allocator));

Review comment:
       I think incrementing the refcount is sufficient since we are all using 
the same allocator here but I guess it doesn't matter too much (this will mean 
we have to copy all data each time though).

##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1050 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+============
+Arrow Flight
+============
+
+This section contains a number of recipes for working with Arrow Flight.
+For more detail about Flight please take a look at `Arrow Flight RPC`_.
+
+.. contents::
+
+Simple Key-Value Storage Service with Arrow Flight
+==================================================
+
+We'll implement a service that provides a key-value store for data, using 
Flight to handle uploads/requests
+and data in memory to store the actual data.
+
+Flight Client and Server
+************************
+
+.. code-block:: java
+
+    import org.apache.arrow.flight.Action;
+    import org.apache.arrow.flight.AsyncPutListener;
+    import org.apache.arrow.flight.CallStatus;
+    import org.apache.arrow.flight.Criteria;
+    import org.apache.arrow.flight.FlightClient;
+    import org.apache.arrow.flight.FlightDescriptor;
+    import org.apache.arrow.flight.FlightEndpoint;
+    import org.apache.arrow.flight.FlightInfo;
+    import org.apache.arrow.flight.FlightServer;
+    import org.apache.arrow.flight.FlightStream;
+    import org.apache.arrow.flight.Location;
+    import org.apache.arrow.flight.NoOpFlightProducer;
+    import org.apache.arrow.flight.PutResult;
+    import org.apache.arrow.flight.Result;
+    import org.apache.arrow.flight.Ticket;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.vector.VarCharVector;
+    import org.apache.arrow.vector.VectorLoader;
+    import org.apache.arrow.vector.VectorSchemaRoot;
+    import org.apache.arrow.vector.VectorUnloader;
+    import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+    import org.apache.arrow.vector.types.pojo.ArrowType;
+    import org.apache.arrow.vector.types.pojo.Field;
+    import org.apache.arrow.vector.types.pojo.FieldType;
+    import org.apache.arrow.vector.types.pojo.Schema;
+
+    import java.io.IOException;
+    import java.nio.charset.StandardCharsets;
+    import java.util.ArrayList;
+    import java.util.Arrays;
+    import java.util.Collections;
+    import java.util.Iterator;
+    import java.util.List;
+    import java.util.Map;
+    import java.util.concurrent.ConcurrentHashMap;
+
+    class DataInMemory {
+        private List<ArrowRecordBatch> listArrowRecordBatch;
+        private Schema schema;
+        private long rows;
+        public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, 
Schema schema, long rows) {
+            this.listArrowRecordBatch = listArrowRecordBatch;
+            this.schema = schema;
+            this.rows = rows;
+        }
+        public List<ArrowRecordBatch> getListArrowRecordBatch() {
+            return listArrowRecordBatch;
+        }
+        public Schema getSchema() {
+            return schema;
+        }
+        public long getRows() {
+            return rows;
+        }
+    }
+
+    // Server
+    Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
+    Map<FlightDescriptor, DataInMemory> dataInMemory = new 
ConcurrentHashMap<>();

Review comment:
       Shouldn't this be a property of the producer class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to