This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 57f643c2ce GH-38022: [Java][FlightRPC] Expose app_metadata on 
FlightInfo and FlightEndpoint (#38331)
57f643c2ce is described below

commit 57f643c2cecca729109daae18c7a64f3a37e76e4
Author: Diego Fernández Giraldo <[email protected]>
AuthorDate: Thu Oct 26 08:34:21 2023 -0600

    GH-38022: [Java][FlightRPC] Expose app_metadata on FlightInfo and 
FlightEndpoint (#38331)
    
    Making necessary changes in Java to expose the newly added app_metadata.
    * Closes: #38022
    
    Authored-by: Diego Fernandez <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 dev/archery/archery/integration/runner.py          |   2 +-
 .../org/apache/arrow/flight/FlightEndpoint.java    |  81 +++++++++++++-
 .../java/org/apache/arrow/flight/FlightInfo.java   | 123 ++++++++++++++++++++-
 .../apache/arrow/flight/TestBasicOperation.java    |   9 +-
 .../org/apache/arrow/flight/TestFlightService.java |   5 +-
 .../AppMetadataFlightInfoEndpointScenario.java     |  76 +++++++++++++
 .../arrow/flight/integration/tests/Scenarios.java  |   1 +
 .../flight/integration/tests/IntegrationTest.java  |   5 +
 8 files changed, 292 insertions(+), 10 deletions(-)

diff --git a/dev/archery/archery/integration/runner.py 
b/dev/archery/archery/integration/runner.py
index 841633f94c..bab00e6d70 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -615,7 +615,7 @@ def run_all_tests(with_cpp=True, with_java=True, 
with_js=True,
         Scenario(
             "app_metadata_flight_info_endpoint",
             description="Ensure support FlightInfo and Endpoint app_metadata",
-            skip_testers={"JS", "C#", "Rust", "Java"}
+            skip_testers={"JS", "C#", "Rust"}
         ),
         Scenario(
             "flight_sql",
diff --git 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java
 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java
index ad78cfbd21..1967fe1d91 100644
--- 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java
+++ 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -30,6 +31,7 @@ import java.util.Optional;
 
 import org.apache.arrow.flight.impl.Flight;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.Timestamp;
 
 /**
@@ -39,6 +41,7 @@ public class FlightEndpoint {
   private final List<Location> locations;
   private final Ticket ticket;
   private final Instant expirationTime;
+  private final byte[] appMetadata;
 
   /**
    * Constructs a new endpoint with no expiration time.
@@ -54,13 +57,22 @@ public class FlightEndpoint {
    * Constructs a new endpoint with an expiration time.
    *
    * @param ticket A ticket that describe the key of a data stream.
+   * @param expirationTime (optional) When this endpoint expires.
    * @param locations  The possible locations the stream can be retrieved from.
    */
   public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... 
locations) {
+    this(ticket, expirationTime, null, Collections.unmodifiableList(new 
ArrayList<>(Arrays.asList(locations))));
+  }
+
+  /**
+   * Private constructor with all parameters. Should only be called by Builder.
+   */
+  private FlightEndpoint(Ticket ticket, Instant expirationTime, byte[] 
appMetadata, List<Location> locations) {
     Objects.requireNonNull(ticket);
-    this.locations = Collections.unmodifiableList(new 
ArrayList<>(Arrays.asList(locations)));
+    this.locations = locations;
     this.expirationTime = expirationTime;
     this.ticket = ticket;
+    this.appMetadata = appMetadata;
   }
 
   /**
@@ -77,6 +89,7 @@ public class FlightEndpoint {
     } else {
       this.expirationTime = null;
     }
+    this.appMetadata = (flt.getAppMetadata().size() == 0 ? null : 
flt.getAppMetadata().toByteArray());
     this.ticket = new Ticket(flt.getTicket());
   }
 
@@ -92,6 +105,10 @@ public class FlightEndpoint {
     return Optional.ofNullable(expirationTime);
   }
 
+  public byte[] getAppMetadata() {
+    return appMetadata;
+  }
+
   /**
    * Converts to the protocol buffer representation.
    */
@@ -111,6 +128,10 @@ public class FlightEndpoint {
               .build());
     }
 
+    if (appMetadata != null) {
+      b.setAppMetadata(ByteString.copyFrom(appMetadata));
+    }
+
     return b.build();
   }
 
@@ -148,12 +169,13 @@ public class FlightEndpoint {
     FlightEndpoint that = (FlightEndpoint) o;
     return locations.equals(that.locations) &&
         ticket.equals(that.ticket) &&
-        Objects.equals(expirationTime, that.expirationTime);
+        Objects.equals(expirationTime, that.expirationTime) &&
+        Arrays.equals(appMetadata, that.appMetadata);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(locations, ticket, expirationTime);
+    return Objects.hash(locations, ticket, expirationTime, 
Arrays.hashCode(appMetadata));
   }
 
   @Override
@@ -162,6 +184,59 @@ public class FlightEndpoint {
         "locations=" + locations +
         ", ticket=" + ticket +
         ", expirationTime=" + (expirationTime == null ? "(none)" : 
expirationTime.toString()) +
+        ", appMetadata=" + (appMetadata == null ? "(none)" : 
Base64.getEncoder().encodeToString(appMetadata)) +
         '}';
   }
+
+  /**
+   * Create a builder for FlightEndpoint.
+   *
+   * @param ticket A ticket that describe the key of a data stream.
+   * @param locations  The possible locations the stream can be retrieved from.
+   */
+  public static Builder builder(Ticket ticket, Location... locations) {
+    return new Builder(ticket, locations);
+  }
+
+  /**
+   * Builder for FlightEndpoint.
+   */
+  public static final class Builder {
+    private final Ticket ticket;
+    private final List<Location> locations;
+    private Instant expirationTime = null;
+    private byte[] appMetadata = null;
+
+    private Builder(Ticket ticket, Location... locations) {
+      this.ticket = ticket;
+      this.locations = Collections.unmodifiableList(new 
ArrayList<>(Arrays.asList(locations)));
+    }
+
+    /**
+     * Set expiration time for the endpoint. Default is null, which means 
don't expire.
+     *
+     * @param expirationTime (optional) When this endpoint expires.
+     */
+    public Builder setExpirationTime(Instant expirationTime) {
+      this.expirationTime = expirationTime;
+      return this;
+    }
+
+    /**
+     * Set the app metadata to send along with the flight. Default is null;
+     *
+     * @param appMetadata Metadata to send along with the flight
+     */
+    public Builder setAppMetadata(byte[] appMetadata) {
+      this.appMetadata = appMetadata;
+      return this;
+    }
+
+    /**
+     * Build FlightEndpoint object.
+     */
+    public FlightEndpoint build() {
+      return new FlightEndpoint(ticket, expirationTime, appMetadata, 
locations);
+    }
+  }
 }
diff --git 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
index d871f89465..b5279a304c 100644
--- 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
+++ 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
@@ -23,6 +23,8 @@ import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -51,6 +53,7 @@ public class FlightInfo {
   private final long records;
   private final boolean ordered;
   private final IpcOption option;
+  private final byte[] appMetadata;
 
   /**
    * Constructs a new instance.
@@ -94,6 +97,23 @@ public class FlightInfo {
    */
   public FlightInfo(Schema schema, FlightDescriptor descriptor, 
List<FlightEndpoint> endpoints, long bytes,
                     long records, boolean ordered, IpcOption option) {
+    this(schema, descriptor, endpoints, bytes, records, ordered, option, null);
+  }
+
+  /**
+   * Constructs a new instance.
+   *
+   * @param schema The schema of the Flight
+   * @param descriptor An identifier for the Flight.
+   * @param endpoints A list of endpoints that have the flight available.
+   * @param bytes The number of bytes in the flight
+   * @param records The number of records in the flight.
+   * @param ordered Whether the endpoints in this flight are ordered.
+   * @param option IPC write options.
+   * @param appMetadata Metadata to send along with the flight
+   */
+  public FlightInfo(Schema schema, FlightDescriptor descriptor, 
List<FlightEndpoint> endpoints, long bytes,
+                    long records, boolean ordered, IpcOption option, byte[] 
appMetadata) {
     Objects.requireNonNull(descriptor);
     Objects.requireNonNull(endpoints);
     if (schema != null) {
@@ -106,6 +126,7 @@ public class FlightInfo {
     this.records = records;
     this.ordered = ordered;
     this.option = option;
+    this.appMetadata = appMetadata;
   }
 
   /**
@@ -131,6 +152,7 @@ public class FlightInfo {
     bytes = pbFlightInfo.getTotalBytes();
     records = pbFlightInfo.getTotalRecords();
     ordered = pbFlightInfo.getOrdered();
+    appMetadata = (pbFlightInfo.getAppMetadata().size() == 0 ? null : 
pbFlightInfo.getAppMetadata().toByteArray());
     option = IpcOption.DEFAULT;
   }
 
@@ -167,6 +189,10 @@ public class FlightInfo {
     return ordered;
   }
 
+  public byte[] getAppMetadata() {
+    return appMetadata;
+  }
+
   /**
    * Converts to the protocol buffer representation.
    */
@@ -189,6 +215,9 @@ public class FlightInfo {
         throw new RuntimeException(e);
       }
     }
+    if (appMetadata != null) {
+      builder.setAppMetadata(ByteString.copyFrom(appMetadata));
+    }
     return builder.build();
   }
 
@@ -229,12 +258,13 @@ public class FlightInfo {
         schema.equals(that.schema) &&
         descriptor.equals(that.descriptor) &&
         endpoints.equals(that.endpoints) &&
-        ordered == that.ordered;
+        ordered == that.ordered &&
+        Arrays.equals(appMetadata, that.appMetadata);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(schema, descriptor, endpoints, bytes, records, 
ordered);
+    return Objects.hash(schema, descriptor, endpoints, bytes, records, 
ordered, Arrays.hashCode(appMetadata));
   }
 
   @Override
@@ -246,6 +276,95 @@ public class FlightInfo {
         ", bytes=" + bytes +
         ", records=" + records +
         ", ordered=" + ordered +
+        ", appMetadata=" + (appMetadata == null ? "(none)" : 
Base64.getEncoder().encodeToString(appMetadata)) +
         '}';
   }
+
+  /**
+   * Create a builder for FlightInfo.
+   *
+   * @param schema     The schema of the Flight
+   * @param descriptor An identifier for the Flight.
+   * @param endpoints  A list of endpoints that have the flight available.
+   */
+  public static Builder builder(Schema schema, FlightDescriptor descriptor, 
List<FlightEndpoint> endpoints) {
+    return new Builder(schema, descriptor, endpoints);
+  }
+
+  /**
+   * Builder for FlightInfo.
+   */
+  public static final class Builder {
+    private final Schema schema;
+    private final FlightDescriptor descriptor;
+    private final List<FlightEndpoint> endpoints;
+    private long bytes = -1;
+    private long records = -1;
+    private boolean ordered = false;
+    private IpcOption option = IpcOption.DEFAULT;
+    private byte[] appMetadata = null;
+
+    private Builder(Schema schema, FlightDescriptor descriptor, 
List<FlightEndpoint> endpoints) {
+      this.schema = schema;
+      this.descriptor = descriptor;
+      this.endpoints = endpoints;
+    }
+
+    /**
+     * Set the number of bytes for the flight. Default to -1 for unknown.
+     *
+     * @param bytes The number of bytes in the flight
+     */
+    public Builder setBytes(long bytes) {
+      this.bytes = bytes;
+      return this;
+    }
+
+    /**
+     * Set the number of records for the flight. Default to -1 for unknown.
+     *
+     * @param records The number of records in the flight.
+     */
+    public Builder setRecords(long records) {
+      this.records = records;
+      return this;
+    }
+
+    /**
+     * Set whether the flight endpoints are ordered. Default is false.
+     *
+     * @param ordered Whether the endpoints in this flight are ordered.
+     */
+    public Builder setOrdered(boolean ordered) {
+      this.ordered = ordered;
+      return this;
+    }
+
+    /**
+     * Set IPC write options. Default is IpcOption.DEFAULT
+     *
+     * @param option IPC write options.
+     */
+    public Builder setOption(IpcOption option) {
+      this.option = option;
+      return this;
+    }
+
+    /**
+     * Set the app metadata to send along with the flight. Default is null.
+     *
+     * @param appMetadata Metadata to send along with the flight
+     */
+    public Builder setAppMetadata(byte[] appMetadata) {
+      this.appMetadata = appMetadata;
+      return this;
+    }
+
+    /**
+     * Build FlightInfo object.
+     */
+    public FlightInfo build() {
+      return new FlightInfo(schema, descriptor, endpoints, bytes, records, 
ordered, option, appMetadata);
+    }
+  }
 }
diff --git 
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
 
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
index 238221f051..41b3a4693e 100644
--- 
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
+++ 
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
@@ -113,10 +113,13 @@ public class TestBasicOperation {
         Field.nullable("a", new ArrowType.Int(32, true)),
         Field.nullable("b", new ArrowType.FixedSizeBinary(32))
     ), metadata);
-    final FlightInfo info1 = new FlightInfo(schema, FlightDescriptor.path(), 
Collections.emptyList(), -1, -1);
+    final FlightInfo info1 = FlightInfo.builder(schema, 
FlightDescriptor.path(), Collections.emptyList())
+            .setAppMetadata("foo".getBytes()).build();
     final FlightInfo info2 = new FlightInfo(schema, 
FlightDescriptor.command(new byte[2]),
-        Collections.singletonList(new FlightEndpoint(
-            new Ticket(new byte[10]), 
Location.forGrpcDomainSocket("/tmp/test.sock"))), 200, 500);
+        Collections.singletonList(
+                FlightEndpoint.builder(new Ticket(new byte[10]), 
Location.forGrpcDomainSocket("/tmp/test.sock"))
+                        .setAppMetadata("bar".getBytes()).build()
+        ), 200, 500);
     final FlightInfo info3 = new FlightInfo(schema, FlightDescriptor.path("a", 
"b"),
         Arrays.asList(new FlightEndpoint(
                 new Ticket(new byte[10]), 
Location.forGrpcDomainSocket("/tmp/test.sock")),
diff --git 
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
 
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
index 691048fb03..0e4669f29c 100644
--- 
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
+++ 
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
@@ -28,6 +28,7 @@ import org.apache.arrow.flight.impl.Flight;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.message.IpcOption;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -137,7 +138,8 @@ public class TestFlightService {
       @Override
       public FlightInfo getFlightInfo(CallContext context,
               FlightDescriptor descriptor) {
-        return new FlightInfo(null, descriptor, Collections.emptyList(), 0, 0);
+        return new FlightInfo(null, descriptor, Collections.emptyList(),
+                0, 0, false, IpcOption.DEFAULT, "foo".getBytes());
       }
     };
 
@@ -147,6 +149,7 @@ public class TestFlightService {
       FlightInfo flightInfo = client.getInfo(FlightDescriptor.path("test"));
       Assertions.assertEquals(Optional.empty(), 
flightInfo.getSchemaOptional());
       Assertions.assertEquals(new Schema(Collections.emptyList()), 
flightInfo.getSchema());
+      Assertions.assertArrayEquals(flightInfo.getAppMetadata(), 
"foo".getBytes());
 
       Exception e = Assertions.assertThrows(
           FlightRuntimeException.class,
diff --git 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java
 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java
new file mode 100644
index 0000000000..3220bb5a2d
--- /dev/null
+++ 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.integration.tests;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightProducer;
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.NoOpFlightProducer;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/** Test app_metadata in FlightInfo and FlightEndpoint. */
+final class AppMetadataFlightInfoEndpointScenario implements Scenario {
+  @Override
+  public FlightProducer producer(BufferAllocator allocator, Location location) 
throws Exception {
+    return new AppMetadataFlightInfoEndpointProducer();
+  }
+
+  @Override
+  public void buildServer(FlightServer.Builder builder) throws Exception {
+  }
+
+  @Override
+  public void client(BufferAllocator allocator, Location location, 
FlightClient client) throws Exception {
+    byte[] cmd = "foobar".getBytes(StandardCharsets.UTF_8);
+    FlightInfo info = client.getInfo(FlightDescriptor.command(cmd));
+    IntegrationAssertions.assertEquals(info.getAppMetadata(), cmd);
+    IntegrationAssertions.assertEquals(info.getEndpoints().size(), 1);
+    
IntegrationAssertions.assertEquals(info.getEndpoints().get(0).getAppMetadata(), 
cmd);
+  }
+
+  /** producer for app_metadata test. */
+  static class AppMetadataFlightInfoEndpointProducer extends 
NoOpFlightProducer {
+    @Override
+    public FlightInfo getFlightInfo(CallContext context, FlightDescriptor 
descriptor) {
+      byte[] cmd = descriptor.getCommand();
+      
+      Schema schema = new Schema(
+              Collections.singletonList(Field.notNullable("number", 
Types.MinorType.UINT4.getType())));
+
+      List<FlightEndpoint> endpoints = Collections.singletonList(
+              FlightEndpoint.builder(
+                      new 
Ticket("".getBytes(StandardCharsets.UTF_8))).setAppMetadata(cmd).build());
+
+      return FlightInfo.builder(schema, descriptor, 
endpoints).setAppMetadata(cmd).build();
+    }
+  }
+}
+
+
diff --git 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
index 26629c650e..c61fd94a4d 100644
--- 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
+++ 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
@@ -49,6 +49,7 @@ final class Scenarios {
     scenarios.put("poll_flight_info", PollFlightInfoScenario::new);
     scenarios.put("flight_sql", FlightSqlScenario::new);
     scenarios.put("flight_sql:extension", FlightSqlExtensionScenario::new);
+    scenarios.put("app_metadata_flight_info_endpoint", 
AppMetadataFlightInfoEndpointScenario::new);
   }
 
   private static Scenarios getInstance() {
diff --git 
a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
 
b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
index cf65e16fac..477a56055c 100644
--- 
a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
+++ 
b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
@@ -78,6 +78,11 @@ class IntegrationTest {
     testScenario("flight_sql:extension");
   }
 
+  @Test
+  void appMetadataFlightInfoEndpoint() throws Exception {
+    testScenario("app_metadata_flight_info_endpoint");
+  }
+
   void testScenario(String scenarioName) throws Exception {
     try (final BufferAllocator allocator = new RootAllocator()) {
       final FlightServer.Builder builder = FlightServer.builder()

Reply via email to