iamsmkr opened a new issue, #218: URL: https://github.com/apache/arrow-cookbook/issues/218
I am trying out the cookbook java example [here](https://arrow.apache.org/cookbook/java/flight.html). The only change is that I am trying to write multiple batches. See "batch" comment in the code. Upon running this example I am seeing unexpected **overlapping results**!! This thing gets wierder with multi-threading. Please suggest what is the correct way of sending multiple batches! ``` S1: Server (Location): Listening on port 33333 C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333 WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.arrow.memory.util.MemoryUtil (file:/Users/rentsher/.m2/repository/org/apache/arrow/arrow-memory-core/8.0.0/arrow-memory-core-8.0.0.jar) to field java.nio.Buffer.address WARNING: Please consider reporting this to the maintainers of org.apache.arrow.memory.util.MemoryUtil WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release C2: Client (Populate Data): Wrote 2 batches with 3 rows each C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Int(64, true) not null>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60} C4: Client (Get Stream): Client Received batch apache/arrow#1, Data: vector size: 10 30 31 32 33 34 35 36 37 38 39 Client Received batch apache/arrow#2, Data: vector size: 10 40 41 42 43 44 45 46 47 48 49 Client Received batch apache/arrow#3, Data: vector size: 10 50 51 52 53 54 55 56 57 58 59 Client Received batch apache/arrow#4, Data: vector size: 10 30 31 32 33 34 35 36 37 38 39 Client Received batch apache/arrow#5, Data: vector size: 10 40 41 42 43 44 45 46 47 48 49 Client Received batch apache/arrow#6, Data: vector size: 10 50 51 52 53 54 55 56 57 58 59 C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Int(64, true) not null>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60} C6: Client (Do Delete Action): Delete completed C7: Client (List Flights Info): After delete - No records C8: Server shut down successfully Process finished with exit code 0 ``` ```java package com.iamsmkr.arrowflight; import org.apache.arrow.flight.Action; import org.apache.arrow.flight.AsyncPutListener; import org.apache.arrow.flight.Criteria; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightDescriptor; import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.flight.FlightServer; import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.Result; import org.apache.arrow.flight.Ticket; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.holders.NullableVarCharHolder; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Iterator; public class CookbookApp { public static void main(String[] args) { Location location = Location.forGrpcInsecure("0.0.0.0", 33333); try (BufferAllocator allocator = new RootAllocator()) { // Server try (FlightServer flightServer = FlightServer.builder(allocator, location, new ArrowFlightProducer(allocator, location)).build()) { try { flightServer.start(); System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort()); } catch (IOException e) { System.exit(1); } // Client try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) { System.out.println("C1: Client (Location): Connected to " + location.getUri()); // Populate data Schema schema = new Schema(Arrays.asList( new Field("name", new FieldType(false, new ArrowType.Int(64, true), null), null))); try ( VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator); BigIntVector names = (BigIntVector) vectorSchemaRoot.getVector("name") ) { FlightClient.ClientStreamListener listener = flightClient.startPut( FlightDescriptor.path("profiles"), vectorSchemaRoot, new AsyncPutListener() ); // Batch 1 int j = 0; for (long i = 0; i < 10; i++) { names.setSafe(j, i); j++; } vectorSchemaRoot.setRowCount(10); while (!listener.isReady()) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } listener.putNext(); // Batch 2 j = 0; for (long i = 10; i < 20; i++) { names.setSafe(j, i); j++; } vectorSchemaRoot.setRowCount(10); while (!listener.isReady()) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } listener.putNext(); // Batch 3 j = 0; for (long i = 20; i < 30; i++) { names.setSafe(j, i); j++; } vectorSchemaRoot.setRowCount(10); while (!listener.isReady()) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } listener.putNext(); // Batch 4 j = 0; for (long i = 30; i < 40; i++) { names.setSafe(j, i); j++; } vectorSchemaRoot.setRowCount(10); while (!listener.isReady()) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } listener.putNext(); // Batch 5 j = 0; for (long i = 40; i < 50; i++) { names.setSafe(j, i); j++; } vectorSchemaRoot.setRowCount(10); while (!listener.isReady()) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } listener.putNext(); // Batch 6 j = 0; for (long i = 50; i < 60; i++) { names.setSafe(j, i); j++; } vectorSchemaRoot.setRowCount(10); while (!listener.isReady()) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } listener.putNext(); listener.completed(); listener.getResult(); System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each"); } // Get metadata information FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles")); System.out.println("C3: Client (Get Metadata): " + flightInfo); // Get data information try (FlightStream flightStream = flightClient.getStream(new Ticket( FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)))) { int batch = 0; try ( VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot(); BigIntVector names = (BigIntVector) vectorSchemaRootReceived.getVector("name") ) { System.out.println("C4: Client (Get Stream):"); while (flightStream.next()) { batch++; System.out.println("Client Received batch #" + batch + ", Data:"); // System.out.print(vectorSchemaRootReceived.contentToTSVString()); int i = vectorSchemaRootReceived.getRowCount(); System.out.println("vector size: " + i); int j = 0; while (j < i) { System.out.println(names.get(j)); // names.get(j); // copy(vcHolder, tmpSB); // System.out.println("name" + j + ": " + tmpSB); j++; } } } } catch (Exception e) { e.printStackTrace(); } // Get all metadata information Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL); System.out.print("C5: Client (List Flights Info): "); flightInfosBefore.forEach(t -> System.out.println(t)); // Do delete action Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE", FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8))); while (deleteActionResult.hasNext()) { Result result = deleteActionResult.next(); System.out.println("C6: Client (Do Delete Action): " + new String(result.getBody(), StandardCharsets.UTF_8)); } // Get all metadata information (to validate detele action) Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL); flightInfos.forEach(System.out::println); System.out.println("C7: Client (List Flights Info): After delete - No records"); // Server shut down flightServer.shutdown(); System.out.println("C8: Server shut down successfully"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
