lidavidm commented on a change in pull request #7012:
URL: https://github.com/apache/arrow/pull/7012#discussion_r419412323



##########
File path: 
java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.testing.ValueVectorDataPopulator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestDoExchange {
+  static byte[] EXCHANGE_DO_GET = "do-get".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_DO_PUT = "do-put".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_ECHO = "echo".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_METADATA_ONLY = 
"only-metadata".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_TRANSFORM = 
"transform".getBytes(StandardCharsets.UTF_8);
+
+  private BufferAllocator allocator;
+  private FlightServer server;
+  private FlightClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    allocator = new RootAllocator(Integer.MAX_VALUE);
+    final Location serverLocation = 
Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0);
+    server = FlightServer.builder(allocator, serverLocation, new 
Producer(allocator)).build();
+    server.start();
+    final Location clientLocation = 
Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
+    client = FlightClient.builder(allocator, clientLocation).build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    AutoCloseables.close(client, server, allocator);
+  }
+
+  /** Test a pure-metadata flow. */
+  @Test
+  public void testDoExchangeOnlyMetadata() throws Exception {
+    // Send a particular descriptor to the server and check for a particular 
response pattern.
+    try (final FlightClient.ExchangeReaderWriter stream =
+             
client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) {
+      final FlightStream reader = stream.getReader();
+
+      // Server starts by sending a message without data (hence no 
VectorSchemaRoot should be present)
+      assertTrue(reader.next());
+      assertFalse(reader.hasRoot());
+      assertEquals(42, reader.getLatestMetadata().getInt(0));
+
+      // Write a metadata message to the server (without sending any data)
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+
+      // Check that the server echoed the metadata back to us
+      assertTrue(reader.next());
+      assertFalse(reader.hasRoot());
+      assertEquals(84, reader.getLatestMetadata().getInt(0));
+
+      // Close our write channel and ensure the server also closes theirs
+      stream.getWriter().completed();
+      assertFalse(reader.next());
+    }
+  }
+
+  /** Emulate a DoGet with a DoExchange. */
+  @Test
+  public void testDoExchangeDoGet() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_GET))) {
+      final FlightStream reader = stream.getReader();
+      VectorSchemaRoot root = reader.getRoot();
+      IntVector iv = (IntVector) root.getVector("a");
+      int value = 0;
+      while (reader.next()) {
+        for (int i = 0; i < root.getRowCount(); i++) {
+          assertFalse(String.format("Row %d should not be null", value), 
iv.isNull(i));
+          assertEquals(value, iv.get(i));
+          value++;
+        }
+      }
+      assertEquals(10, value);
+    }
+  }
+
+  /** Emulate a DoPut with a DoExchange. */
+  @Test
+  public void testDoExchangeDoPut() throws Exception {
+    final Schema schema = new 
Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, 
true))));
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_PUT));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+
+      stream.getWriter().start(root);
+      int counter = 0;
+      for (int i = 0; i < 10; i++) {
+        ValueVectorDataPopulator.setVector(iv, IntStream.range(0, 
i).boxed().toArray(Integer[]::new));
+        root.setRowCount(i);
+        counter += i;
+        stream.getWriter().putNext();
+
+        assertTrue(stream.getReader().next());
+        assertFalse(stream.getReader().hasRoot());
+        // For each write, the server sends back a metadata message containing 
the index of the last written batch
+        final ArrowBuf metadata = stream.getReader().getLatestMetadata();
+        assertEquals(counter, metadata.getInt(0));
+      }
+      stream.getWriter().completed();
+
+      while (stream.getReader().next()) {
+        // Drain the stream. Otherwise closing the stream sends a CANCEL which 
seriously screws with the server.
+        // CANCEL -> runs onCancel handler -> closes the FlightStream early
+      }
+    }
+  }
+
+  /** Test a DoExchange that echoes the client message. */
+  @Test
+  public void testDoExchangeEcho() throws Exception {
+    final Schema schema = new 
Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, 
true))));
+    try (final FlightClient.ExchangeReaderWriter stream = 
client.doExchange(FlightDescriptor.command(EXCHANGE_ECHO));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+      final FlightStream reader = stream.getReader();
+
+      // First try writing metadata without starting the Arrow data stream
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(42);
+      stream.getWriter().putMetadata(buf);
+      buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+
+      // Ensure that the server echoes the metadata back, also without 
starting its data stream
+      assertTrue(reader.next());
+      assertFalse(reader.hasRoot());
+      assertEquals(42, reader.getLatestMetadata().getInt(0));
+      assertTrue(reader.next());
+      assertFalse(reader.hasRoot());
+      assertEquals(84, reader.getLatestMetadata().getInt(0));
+
+      // Write data and check that it gets echoed back.
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+      stream.getWriter().start(root);
+      for (int i = 0; i < 10; i++) {
+        iv.setSafe(0, i);
+        root.setRowCount(1);
+        stream.getWriter().putNext();
+
+        assertTrue(reader.next());
+        assertNull(reader.getLatestMetadata());
+        assertEquals(root.getSchema(), reader.getSchema());
+        assertEquals(i, ((IntVector) reader.getRoot().getVector("a")).get(0));
+      }
+
+      // Ensure the stream is drained. Else, we race with the server for 
shutdown: we'll go and shut down the

Review comment:
       The comment here didn't match the code, I clarified what's going on - 
we're calling gRPC `onCompleted` so the server will then in turn call its 
`onCompleted`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to