This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 8b5b764063 GH-34778: [Java] Only apply ServerInterceptorAdapter logic
to Flight service requests (#34815)
8b5b764063 is described below
commit 8b5b764063ffc6cb379f3da42c459b75c0520c8a
Author: Diego Fernández Giraldo <[email protected]>
AuthorDate: Fri Mar 31 10:51:04 2023 -0600
GH-34778: [Java] Only apply ServerInterceptorAdapter logic to Flight
service requests (#34815)
### Rationale for this change
The interceptor applies to all methods. When receiving a method for a
different service, we don't want to throw an error because the method doesn't
exist.
### What changes are included in this PR?
- Ensure added services work correctly
- Add test to make sure registered services work correctly
- Add documentation explaining how to add services
### Are these changes tested?
Yes
### Are there any user-facing changes?
Yes, documentation now explains how to add services
* Closes: #34778
Authored-by: Diego Fernandez <[email protected]>
Signed-off-by: David Li <[email protected]>
---
docs/source/java/flight.rst | 27 ++++++++++++++++++
java/flight/flight-core/pom.xml | 5 ++++
.../java/org/apache/arrow/flight/FlightServer.java | 1 +
.../flight/grpc/ServerInterceptorAdapter.java | 5 ++++
.../org/apache/arrow/flight/TestServerOptions.java | 32 ++++++++++++++++++++++
5 files changed, 70 insertions(+)
diff --git a/docs/source/java/flight.rst b/docs/source/java/flight.rst
index f62046ecd2..e009998be4 100644
--- a/docs/source/java/flight.rst
+++ b/docs/source/java/flight.rst
@@ -201,6 +201,33 @@ request/response. On the server, they can inspect incoming
headers and
fail the request; hence, they can be used to implement custom
authentication methods.
+Adding Services
+===============
+
+Servers can add other gRPC services. For example, to add the `Health Check
service <https://github.com/grpc/grpc/blob/master/doc/health-checking.md>`_:
+
+.. code-block:: Java
+
+ final HealthStatusManager statusManager = new HealthStatusManager();
+ final Consumer<NettyServerBuilder> consumer = (builder) -> {
+ builder.addService(statusManager.getHealthService());
+ };
+ final Location location = forGrpcInsecure(LOCALHOST, 5555);
+ try (
+ BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
+ Producer producer = new Producer(a);
+ FlightServer s = FlightServer.builder(a, location, producer)
+ .transportHint("grpc.builderConsumer", consumer).build().start();
+ ) {
+ Channel channel =
NettyChannelBuilder.forAddress(location.toSocketAddress()).usePlaintext().build();
+ HealthCheckResponse response = HealthGrpc
+ .newBlockingStub(channel)
+ .check(HealthCheckRequest.getDefaultInstance());
+
+ System.out.println(response.getStatus());
+ }
+
+
:ref:`Flight best practices <flight-best-practices>`
====================================================
diff --git a/java/flight/flight-core/pom.xml b/java/flight/flight-core/pom.xml
index ed51481590..74f73f8441 100644
--- a/java/flight/flight-core/pom.xml
+++ b/java/flight/flight-core/pom.xml
@@ -98,6 +98,11 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-services</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
diff --git
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
index d59480bfb0..7f15798f6a 100644
---
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
+++
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
@@ -262,6 +262,7 @@ public class FlightServer implements AutoCloseable {
new
ThreadFactoryBuilder().setNameFormat("flight-server-default-executor-%d").build());
grpcExecutor = exec;
}
+
final FlightBindingService flightService = new
FlightBindingService(allocator, producer, authHandler, exec);
builder
.executor(exec)
diff --git
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerInterceptorAdapter.java
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerInterceptorAdapter.java
index ddf43ff846..9b038b9d49 100644
---
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerInterceptorAdapter.java
+++
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerInterceptorAdapter.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.arrow.flight.CallInfo;
import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightConstants;
import org.apache.arrow.flight.FlightMethod;
import org.apache.arrow.flight.FlightProducer.CallContext;
import org.apache.arrow.flight.FlightRuntimeException;
@@ -82,6 +83,10 @@ public class ServerInterceptorAdapter implements
ServerInterceptor {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT>
call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
+ if
(!FlightConstants.SERVICE.equals(call.getMethodDescriptor().getServiceName())) {
+ return Contexts.interceptCall(Context.current(), call, headers, next);
+ }
+
final CallInfo info = new
CallInfo(FlightMethod.fromProtocol(call.getMethodDescriptor().getFullMethodName()));
final List<FlightServerMiddleware> middleware = new ArrayList<>();
// Use LinkedHashMap to preserve insertion order
diff --git
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
index 9a2c9b53e8..f66b9e7fd2 100644
---
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
+++
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
@@ -41,9 +41,15 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
+import io.grpc.Channel;
import io.grpc.MethodDescriptor;
import io.grpc.ServerServiceDefinition;
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthGrpc;
+import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
+import io.grpc.protobuf.services.HealthStatusManager;
public class TestServerOptions {
@@ -164,4 +170,30 @@ public class TestServerOptions {
executorService.shutdown();
}
}
+
+ /*
+ * This is an extension of builderConsumer test.
+ * Test that Flight interceptors don't break other registered services
+ */
+ @Test
+ public void addHealthCheckService() throws Exception {
+ final HealthStatusManager statusManager = new HealthStatusManager();
+ final Consumer<NettyServerBuilder> consumer = (builder) -> {
+ builder.addService(statusManager.getHealthService());
+ };
+ final Location location = forGrpcInsecure(LOCALHOST, 5555);
+ try (
+ BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
+ Producer producer = new Producer(a);
+ FlightServer s = FlightServer.builder(a, location, producer)
+ .transportHint("grpc.builderConsumer", consumer).build().start();
+ ) {
+ Channel channel =
NettyChannelBuilder.forAddress(location.toSocketAddress()).usePlaintext().build();
+ HealthCheckResponse response = HealthGrpc
+ .newBlockingStub(channel)
+ .check(HealthCheckRequest.getDefaultInstance());
+
+ assertEquals(response.getStatus(),
HealthCheckResponse.ServingStatus.SERVING);
+ }
+ }
}