This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a323fc91b GH-44444: [Java][CI] Add Java implementation of Flight 
do_exchange integration test (#44445)
1a323fc91b is described below

commit 1a323fc91bbbce2f231262a1619d5f15e9cb7762
Author: Adam Reeve <[email protected]>
AuthorDate: Thu Oct 17 19:54:18 2024 +1300

    GH-44444: [Java][CI] Add Java implementation of Flight do_exchange 
integration test (#44445)
    
    ### Rationale for this change
    
    See #44444, this helps ensure compatibility of the Java `do_exchange` 
implementation with C++ and C#.
    
    ### What changes are included in this PR?
    
    Adds a Java implementation of the `do_exchange:echo` Flight integration 
test, and enables it in Archery.
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    No
    * GitHub Issue: #44444
    
    Authored-by: Adam Reeve <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 dev/archery/archery/integration/runner.py          |  2 +-
 .../integration/tests/DoExchangeEchoScenario.java  | 95 ++++++++++++++++++++++
 .../integration/tests/DoExchangeProducer.java      | 82 +++++++++++++++++++
 .../integration/tests/IntegrationAssertions.java   |  6 ++
 .../arrow/flight/integration/tests/Scenarios.java  |  1 +
 .../flight/integration/tests/IntegrationTest.java  |  5 ++
 6 files changed, 190 insertions(+), 1 deletion(-)

diff --git a/dev/archery/archery/integration/runner.py 
b/dev/archery/archery/integration/runner.py
index 781b41090d..5cba350253 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -673,7 +673,7 @@ def run_all_tests(with_cpp=True, with_java=True, 
with_js=True,
             "do_exchange:echo",
             description=("Test the do_exchange method by "
                          "echoing data back to the client."),
-            skip_testers={"Go", "Java", "JS", "Rust"},
+            skip_testers={"Go", "JS", "Rust"},
         ),
         Scenario(
             "location:reuse_connection",
diff --git 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeEchoScenario.java
 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeEchoScenario.java
new file mode 100644
index 0000000000..3e7fa19a81
--- /dev/null
+++ 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeEchoScenario.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.flight.integration.tests;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightProducer;
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Validator;
+
+/** Test DoExchange by echoing data back to the client. */
+final class DoExchangeEchoScenario implements Scenario {
+  public static final byte[] COMMAND = "echo".getBytes(StandardCharsets.UTF_8);
+
+  @Override
+  public FlightProducer producer(BufferAllocator allocator, Location location) 
throws Exception {
+    return new DoExchangeProducer(allocator);
+  }
+
+  @Override
+  public void buildServer(FlightServer.Builder builder) {}
+
+  @Override
+  public void client(BufferAllocator allocator, Location location, 
FlightClient client)
+      throws Exception {
+    final Schema schema =
+        new Schema(Collections.singletonList(Field.notNullable("x", new 
ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream =
+            client.doExchange(FlightDescriptor.command(COMMAND));
+        final VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+      final FlightStream reader = stream.getReader();
+
+      // Write data and check that it gets echoed back.
+      IntVector iv = (IntVector) root.getVector("x");
+      iv.allocateNew();
+      stream.getWriter().start(root);
+      int rowCount = 10;
+      for (int batchIdx = 0; batchIdx < 4; batchIdx++) {
+        for (int rowIdx = 0; rowIdx < rowCount; rowIdx++) {
+          iv.setSafe(rowIdx, batchIdx + rowIdx);
+        }
+        root.setRowCount(rowCount);
+        boolean writeMetadata = batchIdx % 2 == 0;
+        final byte[] rawMetadata = 
Integer.toString(batchIdx).getBytes(StandardCharsets.UTF_8);
+        if (writeMetadata) {
+          final ArrowBuf metadata = allocator.buffer(rawMetadata.length);
+          metadata.writeBytes(rawMetadata);
+          stream.getWriter().putNext(metadata);
+        } else {
+          stream.getWriter().putNext();
+        }
+
+        IntegrationAssertions.assertTrue("Unexpected end of reader", 
reader.next());
+        if (writeMetadata) {
+          IntegrationAssertions.assertNotNull(reader.getLatestMetadata());
+          final byte[] readMetadata = new byte[rawMetadata.length];
+          reader.getLatestMetadata().readBytes(readMetadata);
+          IntegrationAssertions.assertEquals(rawMetadata, readMetadata);
+        } else {
+          IntegrationAssertions.assertNull(reader.getLatestMetadata());
+        }
+        IntegrationAssertions.assertEquals(root.getSchema(), 
reader.getSchema());
+        Validator.compareVectorSchemaRoot(reader.getRoot(), root);
+      }
+
+      stream.getWriter().completed();
+      IntegrationAssertions.assertFalse("Expected to reach end of reader", 
reader.next());
+    }
+  }
+}
diff --git 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeProducer.java
 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeProducer.java
new file mode 100644
index 0000000000..2e28ab1233
--- /dev/null
+++ 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeProducer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.flight.integration.tests;
+
+import java.util.Arrays;
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.NoOpFlightProducer;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+
+/** The server used for testing the Flight do_exchange method. */
+final class DoExchangeProducer extends NoOpFlightProducer {
+  private final BufferAllocator allocator;
+
+  DoExchangeProducer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  @Override
+  public void doExchange(CallContext context, FlightStream reader, 
ServerStreamListener writer) {
+    FlightDescriptor descriptor = reader.getDescriptor();
+    if (descriptor.isCommand()) {
+      if (Arrays.equals(DoExchangeEchoScenario.COMMAND, 
descriptor.getCommand())) {
+        doEcho(reader, writer);
+      }
+    }
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("Unsupported descriptor: " + descriptor.toString())
+        .toRuntimeException();
+  }
+
+  private void doEcho(FlightStream reader, ServerStreamListener writer) {
+    VectorSchemaRoot root = null;
+    VectorLoader loader = null;
+    while (reader.next()) {
+      if (reader.hasRoot()) {
+        if (root == null) {
+          root = VectorSchemaRoot.create(reader.getSchema(), allocator);
+          loader = new VectorLoader(root);
+          writer.start(root);
+        }
+        VectorUnloader unloader = new VectorUnloader(reader.getRoot());
+        try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
+          loader.load(arb);
+        }
+        if (reader.getLatestMetadata() != null) {
+          reader.getLatestMetadata().getReferenceManager().retain();
+          writer.putNext(reader.getLatestMetadata());
+        } else {
+          writer.putNext();
+        }
+      } else {
+        // Pure metadata
+        reader.getLatestMetadata().getReferenceManager().retain();
+        writer.putMetadata(reader.getLatestMetadata());
+      }
+    }
+    if (root != null) {
+      root.close();
+    }
+    writer.completed();
+  }
+}
diff --git 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java
 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java
index 92d4c73f2b..ada565c635 100644
--- 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java
+++ 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java
@@ -78,6 +78,12 @@ final class IntegrationAssertions {
     }
   }
 
+  static void assertNull(Object actual) {
+    if (actual != null) {
+      throw new AssertionError("Expected: null\n\nbut got: " + actual);
+    }
+  }
+
   static void assertNotNull(Object actual) {
     if (actual == null) {
       throw new AssertionError("Expected: (not null)\n\nbut got: null\n");
diff --git 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
index 451edb6bd5..7903ae994c 100644
--- 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
+++ 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
@@ -51,6 +51,7 @@ final class Scenarios {
     scenarios.put("flight_sql:ingestion", FlightSqlIngestionScenario::new);
     scenarios.put("app_metadata_flight_info_endpoint", 
AppMetadataFlightInfoEndpointScenario::new);
     scenarios.put("session_options", SessionOptionsScenario::new);
+    scenarios.put("do_exchange:echo", DoExchangeEchoScenario::new);
   }
 
   private static Scenarios getInstance() {
diff --git 
a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
 
b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
index 8419432c66..16265b8b37 100644
--- 
a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
+++ 
b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
@@ -99,6 +99,11 @@ class IntegrationTest {
     testScenario("session_options");
   }
 
+  @Test
+  void doExchangeEcho() throws Exception {
+    testScenario("do_exchange:echo");
+  }
+
   void testScenario(String scenarioName) throws Exception {
     TestBufferAllocationListener listener = new TestBufferAllocationListener();
     try (final BufferAllocator allocator = new RootAllocator(listener, 
Long.MAX_VALUE)) {

Reply via email to