This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 57f643c2ce GH-38022: [Java][FlightRPC] Expose app_metadata on
FlightInfo and FlightEndpoint (#38331)
57f643c2ce is described below
commit 57f643c2cecca729109daae18c7a64f3a37e76e4
Author: Diego Fernández Giraldo <[email protected]>
AuthorDate: Thu Oct 26 08:34:21 2023 -0600
GH-38022: [Java][FlightRPC] Expose app_metadata on FlightInfo and
FlightEndpoint (#38331)
Making necessary changes in Java to expose the newly added app_metadata.
* Closes: #38022
Authored-by: Diego Fernandez <[email protected]>
Signed-off-by: David Li <[email protected]>
---
dev/archery/archery/integration/runner.py | 2 +-
.../org/apache/arrow/flight/FlightEndpoint.java | 81 +++++++++++++-
.../java/org/apache/arrow/flight/FlightInfo.java | 123 ++++++++++++++++++++-
.../apache/arrow/flight/TestBasicOperation.java | 9 +-
.../org/apache/arrow/flight/TestFlightService.java | 5 +-
.../AppMetadataFlightInfoEndpointScenario.java | 76 +++++++++++++
.../arrow/flight/integration/tests/Scenarios.java | 1 +
.../flight/integration/tests/IntegrationTest.java | 5 +
8 files changed, 292 insertions(+), 10 deletions(-)
diff --git a/dev/archery/archery/integration/runner.py
b/dev/archery/archery/integration/runner.py
index 841633f94c..bab00e6d70 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -615,7 +615,7 @@ def run_all_tests(with_cpp=True, with_java=True,
with_js=True,
Scenario(
"app_metadata_flight_info_endpoint",
description="Ensure support FlightInfo and Endpoint app_metadata",
- skip_testers={"JS", "C#", "Rust", "Java"}
+ skip_testers={"JS", "C#", "Rust"}
),
Scenario(
"flight_sql",
diff --git
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java
index ad78cfbd21..1967fe1d91 100644
---
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java
+++
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -30,6 +31,7 @@ import java.util.Optional;
import org.apache.arrow.flight.impl.Flight;
+import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
/**
@@ -39,6 +41,7 @@ public class FlightEndpoint {
private final List<Location> locations;
private final Ticket ticket;
private final Instant expirationTime;
+ private final byte[] appMetadata;
/**
* Constructs a new endpoint with no expiration time.
@@ -54,13 +57,22 @@ public class FlightEndpoint {
* Constructs a new endpoint with an expiration time.
*
* @param ticket A ticket that describe the key of a data stream.
+ * @param expirationTime (optional) When this endpoint expires.
* @param locations The possible locations the stream can be retrieved from.
*/
public FlightEndpoint(Ticket ticket, Instant expirationTime, Location...
locations) {
+ this(ticket, expirationTime, null, Collections.unmodifiableList(new
ArrayList<>(Arrays.asList(locations))));
+ }
+
+ /**
+ * Private constructor with all parameters. Should only be called by Builder.
+ */
+ private FlightEndpoint(Ticket ticket, Instant expirationTime, byte[]
appMetadata, List<Location> locations) {
Objects.requireNonNull(ticket);
- this.locations = Collections.unmodifiableList(new
ArrayList<>(Arrays.asList(locations)));
+ this.locations = locations;
this.expirationTime = expirationTime;
this.ticket = ticket;
+ this.appMetadata = appMetadata;
}
/**
@@ -77,6 +89,7 @@ public class FlightEndpoint {
} else {
this.expirationTime = null;
}
+ this.appMetadata = (flt.getAppMetadata().size() == 0 ? null :
flt.getAppMetadata().toByteArray());
this.ticket = new Ticket(flt.getTicket());
}
@@ -92,6 +105,10 @@ public class FlightEndpoint {
return Optional.ofNullable(expirationTime);
}
+ public byte[] getAppMetadata() {
+ return appMetadata;
+ }
+
/**
* Converts to the protocol buffer representation.
*/
@@ -111,6 +128,10 @@ public class FlightEndpoint {
.build());
}
+ if (appMetadata != null) {
+ b.setAppMetadata(ByteString.copyFrom(appMetadata));
+ }
+
return b.build();
}
@@ -148,12 +169,13 @@ public class FlightEndpoint {
FlightEndpoint that = (FlightEndpoint) o;
return locations.equals(that.locations) &&
ticket.equals(that.ticket) &&
- Objects.equals(expirationTime, that.expirationTime);
+ Objects.equals(expirationTime, that.expirationTime) &&
+ Arrays.equals(appMetadata, that.appMetadata);
}
@Override
public int hashCode() {
- return Objects.hash(locations, ticket, expirationTime);
+ return Objects.hash(locations, ticket, expirationTime,
Arrays.hashCode(appMetadata));
}
@Override
@@ -162,6 +184,59 @@ public class FlightEndpoint {
"locations=" + locations +
", ticket=" + ticket +
", expirationTime=" + (expirationTime == null ? "(none)" :
expirationTime.toString()) +
+ ", appMetadata=" + (appMetadata == null ? "(none)" :
Base64.getEncoder().encodeToString(appMetadata)) +
'}';
}
+
+ /**
+ * Create a builder for FlightEndpoint.
+ *
+ * @param ticket A ticket that describe the key of a data stream.
+ * @param locations The possible locations the stream can be retrieved from.
+ */
+ public static Builder builder(Ticket ticket, Location... locations) {
+ return new Builder(ticket, locations);
+ }
+
+ /**
+ * Builder for FlightEndpoint.
+ */
+ public static final class Builder {
+ private final Ticket ticket;
+ private final List<Location> locations;
+ private Instant expirationTime = null;
+ private byte[] appMetadata = null;
+
+ private Builder(Ticket ticket, Location... locations) {
+ this.ticket = ticket;
+ this.locations = Collections.unmodifiableList(new
ArrayList<>(Arrays.asList(locations)));
+ }
+
+ /**
+ * Set expiration time for the endpoint. Default is null, which means
don't expire.
+ *
+ * @param expirationTime (optional) When this endpoint expires.
+ */
+ public Builder setExpirationTime(Instant expirationTime) {
+ this.expirationTime = expirationTime;
+ return this;
+ }
+
+ /**
+ * Set the app metadata to send along with the flight. Default is null;
+ *
+ * @param appMetadata Metadata to send along with the flight
+ */
+ public Builder setAppMetadata(byte[] appMetadata) {
+ this.appMetadata = appMetadata;
+ return this;
+ }
+
+ /**
+ * Build FlightEndpoint object.
+ */
+ public FlightEndpoint build() {
+ return new FlightEndpoint(ticket, expirationTime, appMetadata,
locations);
+ }
+ }
}
diff --git
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
index d871f89465..b5279a304c 100644
---
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
+++
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
@@ -23,6 +23,8 @@ import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -51,6 +53,7 @@ public class FlightInfo {
private final long records;
private final boolean ordered;
private final IpcOption option;
+ private final byte[] appMetadata;
/**
* Constructs a new instance.
@@ -94,6 +97,23 @@ public class FlightInfo {
*/
public FlightInfo(Schema schema, FlightDescriptor descriptor,
List<FlightEndpoint> endpoints, long bytes,
long records, boolean ordered, IpcOption option) {
+ this(schema, descriptor, endpoints, bytes, records, ordered, option, null);
+ }
+
+ /**
+ * Constructs a new instance.
+ *
+ * @param schema The schema of the Flight
+ * @param descriptor An identifier for the Flight.
+ * @param endpoints A list of endpoints that have the flight available.
+ * @param bytes The number of bytes in the flight
+ * @param records The number of records in the flight.
+ * @param ordered Whether the endpoints in this flight are ordered.
+ * @param option IPC write options.
+ * @param appMetadata Metadata to send along with the flight
+ */
+ public FlightInfo(Schema schema, FlightDescriptor descriptor,
List<FlightEndpoint> endpoints, long bytes,
+ long records, boolean ordered, IpcOption option, byte[]
appMetadata) {
Objects.requireNonNull(descriptor);
Objects.requireNonNull(endpoints);
if (schema != null) {
@@ -106,6 +126,7 @@ public class FlightInfo {
this.records = records;
this.ordered = ordered;
this.option = option;
+ this.appMetadata = appMetadata;
}
/**
@@ -131,6 +152,7 @@ public class FlightInfo {
bytes = pbFlightInfo.getTotalBytes();
records = pbFlightInfo.getTotalRecords();
ordered = pbFlightInfo.getOrdered();
+ appMetadata = (pbFlightInfo.getAppMetadata().size() == 0 ? null :
pbFlightInfo.getAppMetadata().toByteArray());
option = IpcOption.DEFAULT;
}
@@ -167,6 +189,10 @@ public class FlightInfo {
return ordered;
}
+ public byte[] getAppMetadata() {
+ return appMetadata;
+ }
+
/**
* Converts to the protocol buffer representation.
*/
@@ -189,6 +215,9 @@ public class FlightInfo {
throw new RuntimeException(e);
}
}
+ if (appMetadata != null) {
+ builder.setAppMetadata(ByteString.copyFrom(appMetadata));
+ }
return builder.build();
}
@@ -229,12 +258,13 @@ public class FlightInfo {
schema.equals(that.schema) &&
descriptor.equals(that.descriptor) &&
endpoints.equals(that.endpoints) &&
- ordered == that.ordered;
+ ordered == that.ordered &&
+ Arrays.equals(appMetadata, that.appMetadata);
}
@Override
public int hashCode() {
- return Objects.hash(schema, descriptor, endpoints, bytes, records,
ordered);
+ return Objects.hash(schema, descriptor, endpoints, bytes, records,
ordered, Arrays.hashCode(appMetadata));
}
@Override
@@ -246,6 +276,95 @@ public class FlightInfo {
", bytes=" + bytes +
", records=" + records +
", ordered=" + ordered +
+ ", appMetadata=" + (appMetadata == null ? "(none)" :
Base64.getEncoder().encodeToString(appMetadata)) +
'}';
}
+
+ /**
+ * Create a builder for FlightInfo.
+ *
+ * @param schema The schema of the Flight
+ * @param descriptor An identifier for the Flight.
+ * @param endpoints A list of endpoints that have the flight available.
+ */
+ public static Builder builder(Schema schema, FlightDescriptor descriptor,
List<FlightEndpoint> endpoints) {
+ return new Builder(schema, descriptor, endpoints);
+ }
+
+ /**
+ * Builder for FlightInfo.
+ */
+ public static final class Builder {
+ private final Schema schema;
+ private final FlightDescriptor descriptor;
+ private final List<FlightEndpoint> endpoints;
+ private long bytes = -1;
+ private long records = -1;
+ private boolean ordered = false;
+ private IpcOption option = IpcOption.DEFAULT;
+ private byte[] appMetadata = null;
+
+ private Builder(Schema schema, FlightDescriptor descriptor,
List<FlightEndpoint> endpoints) {
+ this.schema = schema;
+ this.descriptor = descriptor;
+ this.endpoints = endpoints;
+ }
+
+ /**
+ * Set the number of bytes for the flight. Default to -1 for unknown.
+ *
+ * @param bytes The number of bytes in the flight
+ */
+ public Builder setBytes(long bytes) {
+ this.bytes = bytes;
+ return this;
+ }
+
+ /**
+ * Set the number of records for the flight. Default to -1 for unknown.
+ *
+ * @param records The number of records in the flight.
+ */
+ public Builder setRecords(long records) {
+ this.records = records;
+ return this;
+ }
+
+ /**
+ * Set whether the flight endpoints are ordered. Default is false.
+ *
+ * @param ordered Whether the endpoints in this flight are ordered.
+ */
+ public Builder setOrdered(boolean ordered) {
+ this.ordered = ordered;
+ return this;
+ }
+
+ /**
+ * Set IPC write options. Default is IpcOption.DEFAULT
+ *
+ * @param option IPC write options.
+ */
+ public Builder setOption(IpcOption option) {
+ this.option = option;
+ return this;
+ }
+
+ /**
+ * Set the app metadata to send along with the flight. Default is null.
+ *
+ * @param appMetadata Metadata to send along with the flight
+ */
+ public Builder setAppMetadata(byte[] appMetadata) {
+ this.appMetadata = appMetadata;
+ return this;
+ }
+
+ /**
+ * Build FlightInfo object.
+ */
+ public FlightInfo build() {
+ return new FlightInfo(schema, descriptor, endpoints, bytes, records,
ordered, option, appMetadata);
+ }
+ }
}
diff --git
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
index 238221f051..41b3a4693e 100644
---
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
+++
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
@@ -113,10 +113,13 @@ public class TestBasicOperation {
Field.nullable("a", new ArrowType.Int(32, true)),
Field.nullable("b", new ArrowType.FixedSizeBinary(32))
), metadata);
- final FlightInfo info1 = new FlightInfo(schema, FlightDescriptor.path(),
Collections.emptyList(), -1, -1);
+ final FlightInfo info1 = FlightInfo.builder(schema,
FlightDescriptor.path(), Collections.emptyList())
+ .setAppMetadata("foo".getBytes()).build();
final FlightInfo info2 = new FlightInfo(schema,
FlightDescriptor.command(new byte[2]),
- Collections.singletonList(new FlightEndpoint(
- new Ticket(new byte[10]),
Location.forGrpcDomainSocket("/tmp/test.sock"))), 200, 500);
+ Collections.singletonList(
+ FlightEndpoint.builder(new Ticket(new byte[10]),
Location.forGrpcDomainSocket("/tmp/test.sock"))
+ .setAppMetadata("bar".getBytes()).build()
+ ), 200, 500);
final FlightInfo info3 = new FlightInfo(schema, FlightDescriptor.path("a",
"b"),
Arrays.asList(new FlightEndpoint(
new Ticket(new byte[10]),
Location.forGrpcDomainSocket("/tmp/test.sock")),
diff --git
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
index 691048fb03..0e4669f29c 100644
---
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
+++
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
@@ -28,6 +28,7 @@ import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -137,7 +138,8 @@ public class TestFlightService {
@Override
public FlightInfo getFlightInfo(CallContext context,
FlightDescriptor descriptor) {
- return new FlightInfo(null, descriptor, Collections.emptyList(), 0, 0);
+ return new FlightInfo(null, descriptor, Collections.emptyList(),
+ 0, 0, false, IpcOption.DEFAULT, "foo".getBytes());
}
};
@@ -147,6 +149,7 @@ public class TestFlightService {
FlightInfo flightInfo = client.getInfo(FlightDescriptor.path("test"));
Assertions.assertEquals(Optional.empty(),
flightInfo.getSchemaOptional());
Assertions.assertEquals(new Schema(Collections.emptyList()),
flightInfo.getSchema());
+ Assertions.assertArrayEquals(flightInfo.getAppMetadata(),
"foo".getBytes());
Exception e = Assertions.assertThrows(
FlightRuntimeException.class,
diff --git
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java
new file mode 100644
index 0000000000..3220bb5a2d
--- /dev/null
+++
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.integration.tests;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightProducer;
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.NoOpFlightProducer;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/** Test app_metadata in FlightInfo and FlightEndpoint. */
+final class AppMetadataFlightInfoEndpointScenario implements Scenario {
+ @Override
+ public FlightProducer producer(BufferAllocator allocator, Location location)
throws Exception {
+ return new AppMetadataFlightInfoEndpointProducer();
+ }
+
+ @Override
+ public void buildServer(FlightServer.Builder builder) throws Exception {
+ }
+
+ @Override
+ public void client(BufferAllocator allocator, Location location,
FlightClient client) throws Exception {
+ byte[] cmd = "foobar".getBytes(StandardCharsets.UTF_8);
+ FlightInfo info = client.getInfo(FlightDescriptor.command(cmd));
+ IntegrationAssertions.assertEquals(info.getAppMetadata(), cmd);
+ IntegrationAssertions.assertEquals(info.getEndpoints().size(), 1);
+
IntegrationAssertions.assertEquals(info.getEndpoints().get(0).getAppMetadata(),
cmd);
+ }
+
+ /** producer for app_metadata test. */
+ static class AppMetadataFlightInfoEndpointProducer extends
NoOpFlightProducer {
+ @Override
+ public FlightInfo getFlightInfo(CallContext context, FlightDescriptor
descriptor) {
+ byte[] cmd = descriptor.getCommand();
+
+ Schema schema = new Schema(
+ Collections.singletonList(Field.notNullable("number",
Types.MinorType.UINT4.getType())));
+
+ List<FlightEndpoint> endpoints = Collections.singletonList(
+ FlightEndpoint.builder(
+ new
Ticket("".getBytes(StandardCharsets.UTF_8))).setAppMetadata(cmd).build());
+
+ return FlightInfo.builder(schema, descriptor,
endpoints).setAppMetadata(cmd).build();
+ }
+ }
+}
+
+
diff --git
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
index 26629c650e..c61fd94a4d 100644
---
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
+++
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
@@ -49,6 +49,7 @@ final class Scenarios {
scenarios.put("poll_flight_info", PollFlightInfoScenario::new);
scenarios.put("flight_sql", FlightSqlScenario::new);
scenarios.put("flight_sql:extension", FlightSqlExtensionScenario::new);
+ scenarios.put("app_metadata_flight_info_endpoint",
AppMetadataFlightInfoEndpointScenario::new);
}
private static Scenarios getInstance() {
diff --git
a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
index cf65e16fac..477a56055c 100644
---
a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
+++
b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
@@ -78,6 +78,11 @@ class IntegrationTest {
testScenario("flight_sql:extension");
}
+ @Test
+ void appMetadataFlightInfoEndpoint() throws Exception {
+ testScenario("app_metadata_flight_info_endpoint");
+ }
+
void testScenario(String scenarioName) throws Exception {
try (final BufferAllocator allocator = new RootAllocator()) {
final FlightServer.Builder builder = FlightServer.builder()