This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b5b764063 GH-34778: [Java] Only apply ServerInterceptorAdapter logic 
to Flight service requests (#34815)
8b5b764063 is described below

commit 8b5b764063ffc6cb379f3da42c459b75c0520c8a
Author: Diego Fernández Giraldo <[email protected]>
AuthorDate: Fri Mar 31 10:51:04 2023 -0600

    GH-34778: [Java] Only apply ServerInterceptorAdapter logic to Flight 
service requests (#34815)
    
    ### Rationale for this change
    
    The interceptor applies to all methods. When receiving a method for a 
different service, we don't want to throw an error because the method doesn't 
exist.
    
    ### What changes are included in this PR?
    
    - Ensure added services work correctly
    - Add test to make sure registered services work correctly
    - Add documentation explaining how to add services
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    Yes, documentation now explains how to add services
    
    * Closes: #34778
    
    Authored-by: Diego Fernandez <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 docs/source/java/flight.rst                        | 27 ++++++++++++++++++
 java/flight/flight-core/pom.xml                    |  5 ++++
 .../java/org/apache/arrow/flight/FlightServer.java |  1 +
 .../flight/grpc/ServerInterceptorAdapter.java      |  5 ++++
 .../org/apache/arrow/flight/TestServerOptions.java | 32 ++++++++++++++++++++++
 5 files changed, 70 insertions(+)

diff --git a/docs/source/java/flight.rst b/docs/source/java/flight.rst
index f62046ecd2..e009998be4 100644
--- a/docs/source/java/flight.rst
+++ b/docs/source/java/flight.rst
@@ -201,6 +201,33 @@ request/response. On the server, they can inspect incoming 
headers and
 fail the request; hence, they can be used to implement custom
 authentication methods.
 
+Adding Services
+===============
+
+Servers can add other gRPC services. For example, to add the `Health Check 
service <https://github.com/grpc/grpc/blob/master/doc/health-checking.md>`_:
+
+.. code-block:: Java
+
+    final HealthStatusManager statusManager = new HealthStatusManager();
+    final Consumer<NettyServerBuilder> consumer = (builder) -> {
+      builder.addService(statusManager.getHealthService());
+    };
+    final Location location = forGrpcInsecure(LOCALHOST, 5555);
+    try (
+        BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
+        Producer producer = new Producer(a);
+        FlightServer s = FlightServer.builder(a, location, producer)
+            .transportHint("grpc.builderConsumer", consumer).build().start();
+    ) {
+      Channel channel = 
NettyChannelBuilder.forAddress(location.toSocketAddress()).usePlaintext().build();
+      HealthCheckResponse response = HealthGrpc
+              .newBlockingStub(channel)
+              .check(HealthCheckRequest.getDefaultInstance());
+
+      System.out.println(response.getStatus());
+    }
+
+
 :ref:`Flight best practices <flight-best-practices>`
 ====================================================
 
diff --git a/java/flight/flight-core/pom.xml b/java/flight/flight-core/pom.xml
index ed51481590..74f73f8441 100644
--- a/java/flight/flight-core/pom.xml
+++ b/java/flight/flight-core/pom.xml
@@ -98,6 +98,11 @@
       <groupId>io.grpc</groupId>
       <artifactId>grpc-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-services</artifactId>
+      <scope>test</scope>
+    </dependency>
 
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
diff --git 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
index d59480bfb0..7f15798f6a 100644
--- 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
+++ 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
@@ -262,6 +262,7 @@ public class FlightServer implements AutoCloseable {
             new 
ThreadFactoryBuilder().setNameFormat("flight-server-default-executor-%d").build());
         grpcExecutor = exec;
       }
+
       final FlightBindingService flightService = new 
FlightBindingService(allocator, producer, authHandler, exec);
       builder
           .executor(exec)
diff --git 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerInterceptorAdapter.java
 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerInterceptorAdapter.java
index ddf43ff846..9b038b9d49 100644
--- 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerInterceptorAdapter.java
+++ 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerInterceptorAdapter.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.arrow.flight.CallInfo;
 import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightConstants;
 import org.apache.arrow.flight.FlightMethod;
 import org.apache.arrow.flight.FlightProducer.CallContext;
 import org.apache.arrow.flight.FlightRuntimeException;
@@ -82,6 +83,10 @@ public class ServerInterceptorAdapter implements 
ServerInterceptor {
   @Override
   public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> 
call, Metadata headers,
       ServerCallHandler<ReqT, RespT> next) {
+    if 
(!FlightConstants.SERVICE.equals(call.getMethodDescriptor().getServiceName())) {
+      return Contexts.interceptCall(Context.current(), call, headers, next);
+    }
+    
     final CallInfo info = new 
CallInfo(FlightMethod.fromProtocol(call.getMethodDescriptor().getFullMethodName()));
     final List<FlightServerMiddleware> middleware = new ArrayList<>();
     // Use LinkedHashMap to preserve insertion order
diff --git 
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
 
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
index 9a2c9b53e8..f66b9e7fd2 100644
--- 
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
+++ 
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
@@ -41,9 +41,15 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.Test;
 
+import io.grpc.Channel;
 import io.grpc.MethodDescriptor;
 import io.grpc.ServerServiceDefinition;
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthGrpc;
+import io.grpc.netty.NettyChannelBuilder;
 import io.grpc.netty.NettyServerBuilder;
+import io.grpc.protobuf.services.HealthStatusManager;
 
 public class TestServerOptions {
 
@@ -164,4 +170,30 @@ public class TestServerOptions {
       executorService.shutdown();
     }
   }
+
+  /*
+   * This is an extension of builderConsumer test.
+   * Test that Flight interceptors don't break other registered services
+   */
+  @Test
+  public void addHealthCheckService() throws Exception {
+    final HealthStatusManager statusManager = new HealthStatusManager();
+    final Consumer<NettyServerBuilder> consumer = (builder) -> {
+      builder.addService(statusManager.getHealthService());
+    };
+    final Location location = forGrpcInsecure(LOCALHOST, 5555);
+    try (
+        BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
+        Producer producer = new Producer(a);
+        FlightServer s = FlightServer.builder(a, location, producer)
+            .transportHint("grpc.builderConsumer", consumer).build().start();
+    ) {
+      Channel channel = 
NettyChannelBuilder.forAddress(location.toSocketAddress()).usePlaintext().build();
+      HealthCheckResponse response = HealthGrpc
+              .newBlockingStub(channel)
+              .check(HealthCheckRequest.getDefaultInstance());
+
+      assertEquals(response.getStatus(), 
HealthCheckResponse.ServingStatus.SERVING);
+    }
+  }
 }

Reply via email to