emkornfield commented on a change in pull request #7012: URL: https://github.com/apache/arrow/pull/7012#discussion_r419200568
########## File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java ########## @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.stream.IntStream; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.testing.ValueVectorDataPopulator; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.netty.buffer.ArrowBuf; + +public class TestDoExchange { + static byte[] EXCHANGE_DO_GET = "do-get".getBytes(StandardCharsets.UTF_8); + static byte[] EXCHANGE_DO_PUT = "do-put".getBytes(StandardCharsets.UTF_8); + static byte[] EXCHANGE_ECHO = "echo".getBytes(StandardCharsets.UTF_8); + static byte[] EXCHANGE_METADATA_ONLY = "only-metadata".getBytes(StandardCharsets.UTF_8); + static byte[] EXCHANGE_TRANSFORM = "transform".getBytes(StandardCharsets.UTF_8); + + private BufferAllocator allocator; + private FlightServer server; + private FlightClient client; + + @Before + public void setUp() throws Exception { + allocator = new RootAllocator(Integer.MAX_VALUE); + final Location serverLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0); + server = FlightServer.builder(allocator, serverLocation, new Producer(allocator)).build(); + server.start(); + final Location clientLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort()); + client = FlightClient.builder(allocator, clientLocation).build(); + } + + @After + public void tearDown() throws Exception { + AutoCloseables.close(client, server, allocator); + } + + /** Test a pure-metadata flow. */ + @Test + public void testDoExchangeOnlyMetadata() throws Exception { + // Send a particular descriptor to the server and check for a particular response pattern. + try (final FlightClient.ExchangeReaderWriter stream = + client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) { + final FlightStream reader = stream.getReader(); + + // Server starts by sending a message without data (hence no VectorSchemaRoot should be present) + assertTrue(reader.next()); + assertFalse(reader.hasRoot()); + assertEquals(42, reader.getLatestMetadata().getInt(0)); + + // Write a metadata message to the server (without sending any data) + ArrowBuf buf = allocator.buffer(4); + buf.writeInt(84); + stream.getWriter().putMetadata(buf); + + // Check that the server echoed the metadata back to us + assertTrue(reader.next()); + assertFalse(reader.hasRoot()); + assertEquals(84, reader.getLatestMetadata().getInt(0)); + + // Close our write channel and ensure the server also closes theirs + stream.getWriter().completed(); + assertFalse(reader.next()); + } + } + + /** Emulate a DoGet with a DoExchange. */ + @Test + public void testDoExchangeDoGet() throws Exception { + try (final FlightClient.ExchangeReaderWriter stream = + client.doExchange(FlightDescriptor.command(EXCHANGE_DO_GET))) { + final FlightStream reader = stream.getReader(); + VectorSchemaRoot root = reader.getRoot(); + IntVector iv = (IntVector) root.getVector("a"); + int value = 0; + while (reader.next()) { + for (int i = 0; i < root.getRowCount(); i++) { + assertFalse(String.format("Row %d should not be null", value), iv.isNull(i)); + assertEquals(value, iv.get(i)); + value++; + } + } + assertEquals(10, value); + } + } + + /** Emulate a DoPut with a DoExchange. */ + @Test + public void testDoExchangeDoPut() throws Exception { + final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true)))); + try (final FlightClient.ExchangeReaderWriter stream = + client.doExchange(FlightDescriptor.command(EXCHANGE_DO_PUT)); + final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector iv = (IntVector) root.getVector("a"); + iv.allocateNew(); + + stream.getWriter().start(root); + int counter = 0; + for (int i = 0; i < 10; i++) { + ValueVectorDataPopulator.setVector(iv, IntStream.range(0, i).boxed().toArray(Integer[]::new)); + root.setRowCount(i); + counter += i; + stream.getWriter().putNext(); + + assertTrue(stream.getReader().next()); + assertFalse(stream.getReader().hasRoot()); + // For each write, the server sends back a metadata message containing the index of the last written batch + final ArrowBuf metadata = stream.getReader().getLatestMetadata(); + assertEquals(counter, metadata.getInt(0)); + } + stream.getWriter().completed(); + + while (stream.getReader().next()) { + // Drain the stream. Otherwise closing the stream sends a CANCEL which seriously screws with the server. + // CANCEL -> runs onCancel handler -> closes the FlightStream early + } + } + } + + /** Test a DoExchange that echoes the client message. */ + @Test + public void testDoExchangeEcho() throws Exception { + final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true)))); + try (final FlightClient.ExchangeReaderWriter stream = client.doExchange(FlightDescriptor.command(EXCHANGE_ECHO)); + final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + final FlightStream reader = stream.getReader(); + + // First try writing metadata without starting the Arrow data stream + ArrowBuf buf = allocator.buffer(4); + buf.writeInt(42); + stream.getWriter().putMetadata(buf); + buf = allocator.buffer(4); + buf.writeInt(84); + stream.getWriter().putMetadata(buf); + + // Ensure that the server echoes the metadata back, also without starting its data stream + assertTrue(reader.next()); + assertFalse(reader.hasRoot()); + assertEquals(42, reader.getLatestMetadata().getInt(0)); + assertTrue(reader.next()); + assertFalse(reader.hasRoot()); + assertEquals(84, reader.getLatestMetadata().getInt(0)); + + // Write data and check that it gets echoed back. + IntVector iv = (IntVector) root.getVector("a"); + iv.allocateNew(); + stream.getWriter().start(root); + for (int i = 0; i < 10; i++) { + iv.setSafe(0, i); + root.setRowCount(1); + stream.getWriter().putNext(); + + assertTrue(reader.next()); + assertNull(reader.getLatestMetadata()); + assertEquals(root.getSchema(), reader.getSchema()); + assertEquals(i, ((IntVector) reader.getRoot().getVector("a")).get(0)); + } + + // Ensure the stream is drained. Else, we race with the server for shutdown: we'll go and shut down the Review comment: might sense to make a helper drainStream(reader)? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
