This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 71a4ef4b10 GH-38096: [Java] FlightStream with metadata can cause error
when closing (#38110)
71a4ef4b10 is described below
commit 71a4ef4b1014c22e8bc7627bb9c66efa20e1b453
Author: Bryan Cutler <[email protected]>
AuthorDate: Thu Oct 12 07:35:55 2023 -0700
GH-38096: [Java] FlightStream with metadata can cause error when closing
(#38110)
### Rationale for this change
The Java FlightStream can raise an error if metadata is transferred and
ends up being closed twice.
```
java.lang.IllegalStateException: RefCnt has gone negative
at
org.apache.arrow.util.Preconditions.checkState(Preconditions.java:458)
at
org.apache.arrow.memory.BufferLedger.release(BufferLedger.java:130)
at
org.apache.arrow.memory.BufferLedger.release(BufferLedger.java:104)
at org.apache.arrow.memory.ArrowBuf.close(ArrowBuf.java:1044)
at
org.apache.arrow.util.AutoCloseables.close(AutoCloseables.java:97)
at org.apache.arrow.flight.FlightStream.close(FlightStream.java:208)
```
### What changes are included in this PR?
When FlightStream is closed, remove any reference of previous metadata to
prevent reference count going negative if closed again. Also added
`ExchangeReaderWriter.getResult()` for convenience and clear up ambiguity on
error handling.
### Are these changes tested?
Unit tests added for closing with metadata and
### Are there any user-facing changes?
Added `ExchangeReaderWriter.getResult()`
* Closes: #38096
Authored-by: Bryan Cutler <[email protected]>
Signed-off-by: David Li <[email protected]>
---
.../java/org/apache/arrow/flight/FlightClient.java | 10 +++
.../java/org/apache/arrow/flight/FlightStream.java | 2 +
.../org/apache/arrow/flight/TestDoExchange.java | 91 ++++++++++++++++++++++
3 files changed, 103 insertions(+)
diff --git
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
index 422ed01c39..91e3b4d052 100644
---
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
+++
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
@@ -429,6 +429,16 @@ public class FlightClient implements AutoCloseable {
return writer;
}
+ /**
+ * Make sure stream is drained. You must call this to be notified of any
errors that may have
+ * happened after the exchange is complete. This should be called after
`getWriter().completed()`
+ * and instead of `getWriter().getResult()`.
+ */
+ public void getResult() {
+ // After exchange is complete, make sure stream is drained to propagate
errors through reader
+ while (reader.next()) { };
+ }
+
/** Shut down the streams in this call. */
@Override
public void close() throws Exception {
diff --git
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
index 03ce13c978..ad4ffcbebd 100644
---
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
+++
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
@@ -207,6 +207,8 @@ public class FlightStream implements AutoCloseable {
} else {
AutoCloseables.close(closeables);
}
+ // Remove any metadata after closing to prevent negative refcnt
+ applicationMetadata = null;
} finally {
// The value of this CompletableFuture is meaningless, only whether
it's completed (or has an exception)
// No-op if already complete
diff --git
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
index c2f8e75596..f9db9bfd23 100644
---
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
+++
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
@@ -55,6 +55,7 @@ public class TestDoExchange {
static byte[] EXCHANGE_METADATA_ONLY =
"only-metadata".getBytes(StandardCharsets.UTF_8);
static byte[] EXCHANGE_TRANSFORM =
"transform".getBytes(StandardCharsets.UTF_8);
static byte[] EXCHANGE_CANCEL = "cancel".getBytes(StandardCharsets.UTF_8);
+ static byte[] EXCHANGE_ERROR = "error".getBytes(StandardCharsets.UTF_8);
private BufferAllocator allocator;
private FlightServer server;
@@ -365,6 +366,37 @@ public class TestDoExchange {
}
}
+ /** Test a DoExchange error handling. */
+ @Test
+ public void testDoExchangeError() throws Exception {
+ final Schema schema = new
Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32,
true))));
+ try (final FlightClient.ExchangeReaderWriter stream =
client.doExchange(FlightDescriptor.command(EXCHANGE_ERROR));
+ final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ final FlightStream reader = stream.getReader();
+
+ // Write data and check that it gets echoed back.
+ IntVector iv = (IntVector) root.getVector("a");
+ iv.allocateNew();
+ stream.getWriter().start(root);
+ for (int i = 0; i < 10; i++) {
+ iv.setSafe(0, i);
+ root.setRowCount(1);
+ stream.getWriter().putNext();
+
+ assertTrue(reader.next());
+ assertEquals(root.getSchema(), reader.getSchema());
+ assertEquals(i, ((IntVector) reader.getRoot().getVector("a")).get(0));
+ }
+
+ // Complete the stream so that the server knows not to expect any more
messages from us.
+ stream.getWriter().completed();
+
+ // Must call reader.next() to get any errors after exchange, will return
false if no error
+ final FlightRuntimeException fre =
assertThrows(FlightRuntimeException.class, stream::getResult);
+ assertEquals("error completing exchange", fre.status().description());
+ }
+ }
+
/** Have the client close the stream without reading; ensure memory is not
leaked. */
@Test
public void testClientClose() throws Exception {
@@ -381,6 +413,38 @@ public class TestDoExchange {
client = null;
}
+ /** Test closing with Metadata can't lead to error. */
+ @Test
+ public void testCloseWithMetadata() throws Exception {
+ // Send a particular descriptor to the server and check for a particular
response pattern.
+ try (final FlightClient.ExchangeReaderWriter stream =
+
client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) {
+ final FlightStream reader = stream.getReader();
+
+ // Server starts by sending a message without data (hence no
VectorSchemaRoot should be present)
+ assertTrue(reader.next());
+ assertFalse(reader.hasRoot());
+ assertEquals(42, reader.getLatestMetadata().getInt(0));
+
+ // Write a metadata message to the server (without sending any data)
+ ArrowBuf buf = allocator.buffer(4);
+ buf.writeInt(84);
+ stream.getWriter().putMetadata(buf);
+
+ // Check that the server echoed the metadata back to us
+ assertTrue(reader.next());
+ assertFalse(reader.hasRoot());
+ assertEquals(84, reader.getLatestMetadata().getInt(0));
+
+ // Close our write channel and ensure the server also closes theirs
+ stream.getWriter().completed();
+ stream.getResult();
+
+ // Not necessary to close reader here, but check closing twice doesn't
lead to negative refcnt from metadata
+ stream.getReader().close();
+ }
+ }
+
static class Producer extends NoOpFlightProducer {
static final Schema SCHEMA = new Schema(
Collections.singletonList(Field.nullable("a", new ArrowType.Int(32,
true))));
@@ -404,6 +468,8 @@ public class TestDoExchange {
transform(context, reader, writer);
} else if (Arrays.equals(reader.getDescriptor().getCommand(),
EXCHANGE_CANCEL)) {
cancel(context, reader, writer);
+ } else if (Arrays.equals(reader.getDescriptor().getCommand(),
EXCHANGE_ERROR)) {
+ error(context, reader, writer);
} else {
writer.error(CallStatus.UNIMPLEMENTED.withDescription("Command not
implemented").toRuntimeException());
}
@@ -534,5 +600,30 @@ public class TestDoExchange {
private void cancel(CallContext context, FlightStream reader,
ServerStreamListener writer) {
writer.error(CallStatus.CANCELLED.withDescription("expected").toRuntimeException());
}
+
+ private void error(CallContext context, FlightStream reader,
ServerStreamListener writer) {
+ VectorSchemaRoot root = null;
+ VectorLoader loader = null;
+ while (reader.next()) {
+
+ if (root == null) {
+ root = VectorSchemaRoot.create(reader.getSchema(), allocator);
+ loader = new VectorLoader(root);
+ writer.start(root);
+ }
+ VectorUnloader unloader = new VectorUnloader(reader.getRoot());
+ try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
+ loader.load(arb);
+ }
+
+ writer.putNext();
+ }
+ if (root != null) {
+ root.close();
+ }
+
+ // An error occurs before completing the writer
+ writer.error(CallStatus.INTERNAL.withDescription("error completing
exchange").toRuntimeException());
+ }
}
}