This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 6c14172201 GH-40745: [Java][FlightRPC] Support configuring
backpressure threshold (#41051)
6c14172201 is described below
commit 6c1417220151fe91e6ea9c5e6af7916863f40bfe
Author: James Duong <[email protected]>
AuthorDate: Mon Apr 8 08:05:16 2024 +0900
GH-40745: [Java][FlightRPC] Support configuring backpressure threshold
(#41051)
### Rationale for this change
gRPC uses a default backpressure threshold that is too low for services
that send large amounts of data such as Arrow Flight. This causes excessive
blocking and reduces throughput.
### What changes are included in this PR?
* Update to grpc-java 1.63.0
* Add to FlightServer.Builder an option to set the number of bytes queued
before blocking due to backpressure. Set the default to 10MB instead of gRPC's
default of 64K.
* Add a ServerInterceptor for automating setting the backpressure threshold
on ServerCalls.
### Are these changes tested?
Tested through existing unit tests.
### Are there any user-facing changes?
The FlightServer.Builder class has an extra configuration option to let
users change the backpressure threshold themselves.
* GitHub Issue: #40745
Authored-by: James Duong <[email protected]>
Signed-off-by: David Li <[email protected]>
---
.../java/org/apache/arrow/flight/FlightServer.java | 15 ++++++++
.../ServerBackpressureThresholdInterceptor.java | 43 ++++++++++++++++++++++
java/pom.xml | 2 +-
3 files changed, 59 insertions(+), 1 deletion(-)
diff --git
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
index d873f7d282..dc545c1318 100644
---
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
+++
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
@@ -42,6 +42,7 @@ import org.apache.arrow.flight.auth.ServerAuthInterceptor;
import org.apache.arrow.flight.auth2.Auth2Constants;
import org.apache.arrow.flight.auth2.CallHeaderAuthenticator;
import org.apache.arrow.flight.auth2.ServerCallHeaderAuthMiddleware;
+import org.apache.arrow.flight.grpc.ServerBackpressureThresholdInterceptor;
import org.apache.arrow.flight.grpc.ServerInterceptorAdapter;
import org.apache.arrow.flight.grpc.ServerInterceptorAdapter.KeyFactory;
import org.apache.arrow.memory.BufferAllocator;
@@ -79,6 +80,9 @@ public class FlightServer implements AutoCloseable {
/** The maximum size of an individual gRPC message. This effectively
disables the limit. */
static final int MAX_GRPC_MESSAGE_SIZE = Integer.MAX_VALUE;
+ /** The default number of bytes that can be queued on an output stream
before blocking. */
+ public static final int DEFAULT_BACKPRESSURE_THRESHOLD = 10 * 1024 * 1024;
// 10MB
+
/** Create a new instance from a gRPC server. For internal use only. */
private FlightServer(Location location, Server server, ExecutorService
grpcExecutor) {
this.location = location;
@@ -179,6 +183,7 @@ public class FlightServer implements AutoCloseable {
private CallHeaderAuthenticator headerAuthenticator =
CallHeaderAuthenticator.NO_OP;
private ExecutorService executor = null;
private int maxInboundMessageSize = MAX_GRPC_MESSAGE_SIZE;
+ private int backpressureThreshold = DEFAULT_BACKPRESSURE_THRESHOLD;
private InputStream certChain;
private InputStream key;
private InputStream mTlsCACert;
@@ -300,6 +305,7 @@ public class FlightServer implements AutoCloseable {
.addService(
ServerInterceptors.intercept(
flightService,
+ new
ServerBackpressureThresholdInterceptor(backpressureThreshold),
new ServerAuthInterceptor(authHandler)));
// Allow hooking into the gRPC builder. This is not guaranteed to be
available on all Arrow versions or
@@ -336,6 +342,15 @@ public class FlightServer implements AutoCloseable {
return this;
}
+ /**
+ * Set the number of bytes that may be queued on a server output stream
before writes are blocked.
+ */
+ public Builder backpressureThreshold(int backpressureThreshold) {
+ Preconditions.checkArgument(backpressureThreshold > 0);
+ this.backpressureThreshold = backpressureThreshold;
+ return this;
+ }
+
/**
* A small utility function to ensure that InputStream attributes.
* are closed if they are not null
diff --git
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerBackpressureThresholdInterceptor.java
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerBackpressureThresholdInterceptor.java
new file mode 100644
index 0000000000..bd42fbc8ad
--- /dev/null
+++
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerBackpressureThresholdInterceptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.grpc;
+
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+
+/**
+ * An interceptor for specifying the number of bytes that can be queued before
a call with an output stream
+ * gets blocked by backpressure.
+ */
+public class ServerBackpressureThresholdInterceptor implements
ServerInterceptor {
+
+ private final int numBytes;
+
+ public ServerBackpressureThresholdInterceptor(int numBytes) {
+ this.numBytes = numBytes;
+ }
+
+ @Override
+ public <ReqT, RespT> ServerCall.Listener<ReqT>
interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
+ ServerCallHandler<ReqT, RespT> next) {
+ call.setOnReadyThreshold(numBytes);
+ return next.startCall(call, headers);
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 95b27922ea..9892061677 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -34,7 +34,7 @@
<dep.slf4j.version>2.0.11</dep.slf4j.version>
<dep.guava-bom.version>33.0.0-jre</dep.guava-bom.version>
<dep.netty-bom.version>4.1.108.Final</dep.netty-bom.version>
- <dep.grpc-bom.version>1.62.2</dep.grpc-bom.version>
+ <dep.grpc-bom.version>1.63.0</dep.grpc-bom.version>
<dep.protobuf-bom.version>3.23.1</dep.protobuf-bom.version>
<dep.jackson-bom.version>2.17.0</dep.jackson-bom.version>
<dep.hadoop.version>3.4.0</dep.hadoop.version>