This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 1a323fc91b GH-44444: [Java][CI] Add Java implementation of Flight
do_exchange integration test (#44445)
1a323fc91b is described below
commit 1a323fc91bbbce2f231262a1619d5f15e9cb7762
Author: Adam Reeve <[email protected]>
AuthorDate: Thu Oct 17 19:54:18 2024 +1300
GH-44444: [Java][CI] Add Java implementation of Flight do_exchange
integration test (#44445)
### Rationale for this change
See #44444, this helps ensure compatibility of the Java `do_exchange`
implementation with C++ and C#.
### What changes are included in this PR?
Adds a Java implementation of the `do_exchange:echo` Flight integration
test, and enables it in Archery.
### Are these changes tested?
Yes
### Are there any user-facing changes?
No
* GitHub Issue: #44444
Authored-by: Adam Reeve <[email protected]>
Signed-off-by: David Li <[email protected]>
---
dev/archery/archery/integration/runner.py | 2 +-
.../integration/tests/DoExchangeEchoScenario.java | 95 ++++++++++++++++++++++
.../integration/tests/DoExchangeProducer.java | 82 +++++++++++++++++++
.../integration/tests/IntegrationAssertions.java | 6 ++
.../arrow/flight/integration/tests/Scenarios.java | 1 +
.../flight/integration/tests/IntegrationTest.java | 5 ++
6 files changed, 190 insertions(+), 1 deletion(-)
diff --git a/dev/archery/archery/integration/runner.py
b/dev/archery/archery/integration/runner.py
index 781b41090d..5cba350253 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -673,7 +673,7 @@ def run_all_tests(with_cpp=True, with_java=True,
with_js=True,
"do_exchange:echo",
description=("Test the do_exchange method by "
"echoing data back to the client."),
- skip_testers={"Go", "Java", "JS", "Rust"},
+ skip_testers={"Go", "JS", "Rust"},
),
Scenario(
"location:reuse_connection",
diff --git
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeEchoScenario.java
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeEchoScenario.java
new file mode 100644
index 0000000000..3e7fa19a81
--- /dev/null
+++
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeEchoScenario.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.flight.integration.tests;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightProducer;
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Validator;
+
+/** Test DoExchange by echoing data back to the client. */
+final class DoExchangeEchoScenario implements Scenario {
+ public static final byte[] COMMAND = "echo".getBytes(StandardCharsets.UTF_8);
+
+ @Override
+ public FlightProducer producer(BufferAllocator allocator, Location location)
throws Exception {
+ return new DoExchangeProducer(allocator);
+ }
+
+ @Override
+ public void buildServer(FlightServer.Builder builder) {}
+
+ @Override
+ public void client(BufferAllocator allocator, Location location,
FlightClient client)
+ throws Exception {
+ final Schema schema =
+ new Schema(Collections.singletonList(Field.notNullable("x", new
ArrowType.Int(32, true))));
+ try (final FlightClient.ExchangeReaderWriter stream =
+ client.doExchange(FlightDescriptor.command(COMMAND));
+ final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ final FlightStream reader = stream.getReader();
+
+ // Write data and check that it gets echoed back.
+ IntVector iv = (IntVector) root.getVector("x");
+ iv.allocateNew();
+ stream.getWriter().start(root);
+ int rowCount = 10;
+ for (int batchIdx = 0; batchIdx < 4; batchIdx++) {
+ for (int rowIdx = 0; rowIdx < rowCount; rowIdx++) {
+ iv.setSafe(rowIdx, batchIdx + rowIdx);
+ }
+ root.setRowCount(rowCount);
+ boolean writeMetadata = batchIdx % 2 == 0;
+ final byte[] rawMetadata =
Integer.toString(batchIdx).getBytes(StandardCharsets.UTF_8);
+ if (writeMetadata) {
+ final ArrowBuf metadata = allocator.buffer(rawMetadata.length);
+ metadata.writeBytes(rawMetadata);
+ stream.getWriter().putNext(metadata);
+ } else {
+ stream.getWriter().putNext();
+ }
+
+ IntegrationAssertions.assertTrue("Unexpected end of reader",
reader.next());
+ if (writeMetadata) {
+ IntegrationAssertions.assertNotNull(reader.getLatestMetadata());
+ final byte[] readMetadata = new byte[rawMetadata.length];
+ reader.getLatestMetadata().readBytes(readMetadata);
+ IntegrationAssertions.assertEquals(rawMetadata, readMetadata);
+ } else {
+ IntegrationAssertions.assertNull(reader.getLatestMetadata());
+ }
+ IntegrationAssertions.assertEquals(root.getSchema(),
reader.getSchema());
+ Validator.compareVectorSchemaRoot(reader.getRoot(), root);
+ }
+
+ stream.getWriter().completed();
+ IntegrationAssertions.assertFalse("Expected to reach end of reader",
reader.next());
+ }
+ }
+}
diff --git
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeProducer.java
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeProducer.java
new file mode 100644
index 0000000000..2e28ab1233
--- /dev/null
+++
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeProducer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.flight.integration.tests;
+
+import java.util.Arrays;
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.NoOpFlightProducer;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+
+/** The server used for testing the Flight do_exchange method. */
+final class DoExchangeProducer extends NoOpFlightProducer {
+ private final BufferAllocator allocator;
+
+ DoExchangeProducer(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ @Override
+ public void doExchange(CallContext context, FlightStream reader,
ServerStreamListener writer) {
+ FlightDescriptor descriptor = reader.getDescriptor();
+ if (descriptor.isCommand()) {
+ if (Arrays.equals(DoExchangeEchoScenario.COMMAND,
descriptor.getCommand())) {
+ doEcho(reader, writer);
+ }
+ }
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("Unsupported descriptor: " + descriptor.toString())
+ .toRuntimeException();
+ }
+
+ private void doEcho(FlightStream reader, ServerStreamListener writer) {
+ VectorSchemaRoot root = null;
+ VectorLoader loader = null;
+ while (reader.next()) {
+ if (reader.hasRoot()) {
+ if (root == null) {
+ root = VectorSchemaRoot.create(reader.getSchema(), allocator);
+ loader = new VectorLoader(root);
+ writer.start(root);
+ }
+ VectorUnloader unloader = new VectorUnloader(reader.getRoot());
+ try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
+ loader.load(arb);
+ }
+ if (reader.getLatestMetadata() != null) {
+ reader.getLatestMetadata().getReferenceManager().retain();
+ writer.putNext(reader.getLatestMetadata());
+ } else {
+ writer.putNext();
+ }
+ } else {
+ // Pure metadata
+ reader.getLatestMetadata().getReferenceManager().retain();
+ writer.putMetadata(reader.getLatestMetadata());
+ }
+ }
+ if (root != null) {
+ root.close();
+ }
+ writer.completed();
+ }
+}
diff --git
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java
index 92d4c73f2b..ada565c635 100644
---
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java
+++
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java
@@ -78,6 +78,12 @@ final class IntegrationAssertions {
}
}
+ static void assertNull(Object actual) {
+ if (actual != null) {
+ throw new AssertionError("Expected: null\n\nbut got: " + actual);
+ }
+ }
+
static void assertNotNull(Object actual) {
if (actual == null) {
throw new AssertionError("Expected: (not null)\n\nbut got: null\n");
diff --git
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
index 451edb6bd5..7903ae994c 100644
---
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
+++
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
@@ -51,6 +51,7 @@ final class Scenarios {
scenarios.put("flight_sql:ingestion", FlightSqlIngestionScenario::new);
scenarios.put("app_metadata_flight_info_endpoint",
AppMetadataFlightInfoEndpointScenario::new);
scenarios.put("session_options", SessionOptionsScenario::new);
+ scenarios.put("do_exchange:echo", DoExchangeEchoScenario::new);
}
private static Scenarios getInstance() {
diff --git
a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
index 8419432c66..16265b8b37 100644
---
a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
+++
b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
@@ -99,6 +99,11 @@ class IntegrationTest {
testScenario("session_options");
}
+ @Test
+ void doExchangeEcho() throws Exception {
+ testScenario("do_exchange:echo");
+ }
+
void testScenario(String scenarioName) throws Exception {
TestBufferAllocationListener listener = new TestBufferAllocationListener();
try (final BufferAllocator allocator = new RootAllocator(listener,
Long.MAX_VALUE)) {