This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c14172201 GH-40745: [Java][FlightRPC] Support configuring 
backpressure threshold (#41051)
6c14172201 is described below

commit 6c1417220151fe91e6ea9c5e6af7916863f40bfe
Author: James Duong <[email protected]>
AuthorDate: Mon Apr 8 08:05:16 2024 +0900

    GH-40745: [Java][FlightRPC] Support configuring backpressure threshold 
(#41051)
    
    ### Rationale for this change
    
    gRPC uses a default backpressure threshold that is too low for services 
that send large amounts of data such as Arrow Flight. This causes excessive 
blocking and reduces throughput.
    
    ### What changes are included in this PR?
    
    * Update to grpc-java 1.63.0
    * Add to FlightServer.Builder an option to set the number of bytes queued 
before blocking due to backpressure. Set the default to 10MB instead of gRPC's 
default of 64K.
    * Add a ServerInterceptor for automating setting the backpressure threshold 
on ServerCalls.
    
    ### Are these changes tested?
    
    Tested through existing unit tests.
    
    ### Are there any user-facing changes?
    
    The FlightServer.Builder class has an extra configuration option to let 
users change the backpressure threshold themselves.
    
    * GitHub Issue: #40745
    
    Authored-by: James Duong <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 .../java/org/apache/arrow/flight/FlightServer.java | 15 ++++++++
 .../ServerBackpressureThresholdInterceptor.java    | 43 ++++++++++++++++++++++
 java/pom.xml                                       |  2 +-
 3 files changed, 59 insertions(+), 1 deletion(-)

diff --git 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
index d873f7d282..dc545c1318 100644
--- 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
+++ 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
@@ -42,6 +42,7 @@ import org.apache.arrow.flight.auth.ServerAuthInterceptor;
 import org.apache.arrow.flight.auth2.Auth2Constants;
 import org.apache.arrow.flight.auth2.CallHeaderAuthenticator;
 import org.apache.arrow.flight.auth2.ServerCallHeaderAuthMiddleware;
+import org.apache.arrow.flight.grpc.ServerBackpressureThresholdInterceptor;
 import org.apache.arrow.flight.grpc.ServerInterceptorAdapter;
 import org.apache.arrow.flight.grpc.ServerInterceptorAdapter.KeyFactory;
 import org.apache.arrow.memory.BufferAllocator;
@@ -79,6 +80,9 @@ public class FlightServer implements AutoCloseable {
   /** The maximum size of an individual gRPC message. This effectively 
disables the limit. */
   static final int MAX_GRPC_MESSAGE_SIZE = Integer.MAX_VALUE;
 
+  /** The default number of bytes that can be queued on an output stream 
before blocking. */
+  public static final int DEFAULT_BACKPRESSURE_THRESHOLD = 10 * 1024 * 1024; 
// 10MB
+
   /** Create a new instance from a gRPC server. For internal use only. */
   private FlightServer(Location location, Server server, ExecutorService 
grpcExecutor) {
     this.location = location;
@@ -179,6 +183,7 @@ public class FlightServer implements AutoCloseable {
     private CallHeaderAuthenticator headerAuthenticator = 
CallHeaderAuthenticator.NO_OP;
     private ExecutorService executor = null;
     private int maxInboundMessageSize = MAX_GRPC_MESSAGE_SIZE;
+    private int backpressureThreshold = DEFAULT_BACKPRESSURE_THRESHOLD;
     private InputStream certChain;
     private InputStream key;
     private InputStream mTlsCACert;
@@ -300,6 +305,7 @@ public class FlightServer implements AutoCloseable {
           .addService(
               ServerInterceptors.intercept(
                   flightService,
+                  new 
ServerBackpressureThresholdInterceptor(backpressureThreshold),
                   new ServerAuthInterceptor(authHandler)));
 
       // Allow hooking into the gRPC builder. This is not guaranteed to be 
available on all Arrow versions or
@@ -336,6 +342,15 @@ public class FlightServer implements AutoCloseable {
       return this;
     }
 
+    /**
+     * Set the number of bytes that may be queued on a server output stream 
before writes are blocked.
+     */
+    public Builder backpressureThreshold(int backpressureThreshold) {
+      Preconditions.checkArgument(backpressureThreshold > 0);
+      this.backpressureThreshold = backpressureThreshold;
+      return this;
+    }
+
     /**
      * A small utility function to ensure that InputStream attributes.
      * are closed if they are not null
diff --git 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerBackpressureThresholdInterceptor.java
 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerBackpressureThresholdInterceptor.java
new file mode 100644
index 0000000000..bd42fbc8ad
--- /dev/null
+++ 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerBackpressureThresholdInterceptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.grpc;
+
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+
+/**
+ * An interceptor for specifying the number of bytes that can be queued before 
a call with an output stream
+ * gets blocked by backpressure.
+ */
+public class ServerBackpressureThresholdInterceptor implements 
ServerInterceptor {
+
+  private final int numBytes;
+
+  public ServerBackpressureThresholdInterceptor(int numBytes) {
+    this.numBytes = numBytes;
+  }
+
+  @Override
+  public <ReqT, RespT> ServerCall.Listener<ReqT> 
interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
+       ServerCallHandler<ReqT, RespT> next) {
+    call.setOnReadyThreshold(numBytes);
+    return next.startCall(call, headers);
+  }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 95b27922ea..9892061677 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -34,7 +34,7 @@
     <dep.slf4j.version>2.0.11</dep.slf4j.version>
     <dep.guava-bom.version>33.0.0-jre</dep.guava-bom.version>
     <dep.netty-bom.version>4.1.108.Final</dep.netty-bom.version>
-    <dep.grpc-bom.version>1.62.2</dep.grpc-bom.version>
+    <dep.grpc-bom.version>1.63.0</dep.grpc-bom.version>
     <dep.protobuf-bom.version>3.23.1</dep.protobuf-bom.version>
     <dep.jackson-bom.version>2.17.0</dep.jackson-bom.version>
     <dep.hadoop.version>3.4.0</dep.hadoop.version>

Reply via email to