This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 93c5ddb957 GH-43667: [Java] Keeping Flight default header size 
consistent between server and client  (#43697)
93c5ddb957 is described below

commit 93c5ddb957bb93421a8f84dbd7c5a5b7be2d6d45
Author: PANKAJ9768 <[email protected]>
AuthorDate: Tue Aug 27 05:59:09 2024 +0530

    GH-43667: [Java] Keeping Flight default header size consistent between 
server and client  (#43697)
    
    
    
    ### Rationale for this change
    
    ### What changes are included in this PR?
    Flight client can send header size larger than server can accept. This PR 
is to keep default values consistent across server and client.
    
    ### Are these changes tested?
    
    ### Are there any user-facing changes?
    
    * GitHub Issue: #43667
    
    Authored-by: pankaj kesari <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 .../java/org/apache/arrow/flight/FlightServer.java |  7 +++
 .../org/apache/arrow/flight/TestFlightService.java | 73 ++++++++++++++++++++++
 2 files changed, 80 insertions(+)

diff --git 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
index 05dbe42c49..ac761457f5 100644
--- 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
+++ 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
@@ -188,6 +188,7 @@ public class FlightServer implements AutoCloseable {
     private CallHeaderAuthenticator headerAuthenticator = 
CallHeaderAuthenticator.NO_OP;
     private ExecutorService executor = null;
     private int maxInboundMessageSize = MAX_GRPC_MESSAGE_SIZE;
+    private int maxHeaderListSize = MAX_GRPC_MESSAGE_SIZE;
     private int backpressureThreshold = DEFAULT_BACKPRESSURE_THRESHOLD;
     private InputStream certChain;
     private InputStream key;
@@ -324,6 +325,7 @@ public class FlightServer implements AutoCloseable {
       builder
           .executor(exec)
           .maxInboundMessageSize(maxInboundMessageSize)
+          .maxInboundMetadataSize(maxHeaderListSize)
           .addService(
               ServerInterceptors.intercept(
                   flightService,
@@ -366,6 +368,11 @@ public class FlightServer implements AutoCloseable {
       return new FlightServer(location, builder.build(), grpcExecutor);
     }
 
+    public Builder setMaxHeaderListSize(int maxHeaderListSize) {
+      this.maxHeaderListSize = maxHeaderListSize;
+      return this;
+    }
+
     /**
      * Set the maximum size of a message. Defaults to "unlimited", depending 
on the underlying
      * transport.
diff --git 
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
 
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
index 5ebeb44c1d..fc3f83e4ea 100644
--- 
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
+++ 
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java
@@ -27,6 +27,7 @@ import io.grpc.stub.ServerCallStreamObserver;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.Random;
 import org.apache.arrow.flight.impl.Flight;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
@@ -152,4 +153,76 @@ public class TestFlightService {
       assertEquals("No schema is present in FlightInfo", e.getMessage());
     }
   }
+
+  /**
+   * Test for GH-41584 where flight defaults for header size was not in sync 
b\w client and server.
+   */
+  @Test
+  public void testHeaderSizeExchangeInService() throws Exception {
+    final FlightProducer producer =
+        new NoOpFlightProducer() {
+          @Override
+          public FlightInfo getFlightInfo(CallContext context, 
FlightDescriptor descriptor) {
+            String longHeader =
+                
context.getMiddleware(FlightConstants.HEADER_KEY).headers().get("long-header");
+            return new FlightInfo(
+                null,
+                descriptor,
+                Collections.emptyList(),
+                0,
+                0,
+                false,
+                IpcOption.DEFAULT,
+                longHeader.getBytes(StandardCharsets.UTF_8));
+          }
+        };
+
+    String headerVal = generateRandom(1024 * 10);
+    FlightCallHeaders callHeaders = new FlightCallHeaders();
+    callHeaders.insert("long-header", headerVal);
+    // sever with default header limit same as client
+    try (final FlightServer s =
+            FlightServer.builder(allocator, forGrpcInsecure(LOCALHOST, 0), 
producer)
+                .build()
+                .start();
+        final FlightClient client = FlightClient.builder(allocator, 
s.getLocation()).build()) {
+      FlightInfo flightInfo =
+          client.getInfo(FlightDescriptor.path("test"), new 
HeaderCallOption(callHeaders));
+      assertEquals(Optional.empty(), flightInfo.getSchemaOptional());
+      assertEquals(new Schema(Collections.emptyList()), 
flightInfo.getSchema());
+      assertArrayEquals(flightInfo.getAppMetadata(), 
headerVal.getBytes(StandardCharsets.UTF_8));
+    }
+    // server with 15kb header limit
+    try (final FlightServer s =
+            FlightServer.builder(allocator, forGrpcInsecure(LOCALHOST, 0), 
producer)
+                .setMaxHeaderListSize(1024 * 15)
+                .build()
+                .start();
+        final FlightClient client = FlightClient.builder(allocator, 
s.getLocation()).build()) {
+      FlightInfo flightInfo =
+          client.getInfo(FlightDescriptor.path("test"), new 
HeaderCallOption(callHeaders));
+      assertEquals(Optional.empty(), flightInfo.getSchemaOptional());
+      assertEquals(new Schema(Collections.emptyList()), 
flightInfo.getSchema());
+      assertArrayEquals(flightInfo.getAppMetadata(), 
headerVal.getBytes(StandardCharsets.UTF_8));
+
+      callHeaders.insert("another-header", headerVal + headerVal);
+      FlightRuntimeException e =
+          assertThrows(
+              FlightRuntimeException.class,
+              () ->
+                  client.getInfo(FlightDescriptor.path("test"), new 
HeaderCallOption(callHeaders)));
+      assertEquals("http2 exception", e.getMessage());
+    }
+  }
+
+  private static String generateRandom(int size) {
+    String aToZ = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
+    Random random = new Random();
+    StringBuilder res = new StringBuilder();
+    for (int i = 0; i < size; i++) {
+      int randIndex = random.nextInt(aToZ.length());
+      res.append(aToZ.charAt(randIndex));
+    }
+    return res.toString();
+  }
 }

Reply via email to