This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite-avatica.git


The following commit(s) were added to refs/heads/master by this push:
     new f983742  [CALCITE-4196] Consume all data from client before replying 
with HTTP/401
f983742 is described below

commit f9837420cfabf88874eeb2c0a5b9642ebe2c2461
Author: Josh Elser <els...@apache.org>
AuthorDate: Thu Aug 27 13:56:23 2020 -0400

    [CALCITE-4196] Consume all data from client before replying with HTTP/401
    
    SPNEGO's handshake involves sending an HTTP/401 to "challenge" the
    client to reply with authentication data. If the client is sending
    a significant amount of data in the original request, the client
    will still be writing this data when the server replies. This causes
    the client to receive a TCP Reset when it continues to write data, and
    ultimately manifests in a "Broken Pipe" runtime exception.
    
    The fix is to simply consume all data the client wrote prior to
    responding with the HTTP/401.
    
    Closes #127
    
    Signed-off-by: Kevin Risden <kris...@apache.org>
---
 .../org/apache/calcite/avatica/AvaticaUtils.java   | 22 ++++++
 .../calcite/avatica/test/AvaticaUtilsTest.java     | 33 ++++++++
 .../avatica/server/AbstractAvaticaHandler.java     |  8 ++
 .../avatica/server/AvaticaProtobufHandler.java     | 89 ++++++++++++----------
 .../avatica/server/AvaticaSpnegoAuthenticator.java | 10 +++
 .../apache/calcite/avatica/AvaticaSpnegoTest.java  |  1 -
 .../avatica/server/AbstractAvaticaHandlerTest.java |  7 ++
 .../server/AvaticaSpnegoAuthenticatorTest.java     | 21 ++++-
 8 files changed, 147 insertions(+), 44 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java 
b/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
index 45e8236..18b22dd 100644
--- a/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
+++ b/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
@@ -57,6 +57,8 @@ public class AvaticaUtils {
     }
   };
 
+  private static final int SKIP_BUFFER_SIZE = 4096;
+
   private AvaticaUtils() {}
 
   static {
@@ -286,6 +288,26 @@ public class AvaticaUtils {
     return buffer.toArray();
   }
 
+  /**
+   * Reads and discards all data available on the InputStream.
+   */
+  public static void skipFully(InputStream inputStream) throws IOException {
+    byte[] temp = null;
+    while (true) {
+      long bytesSkipped = inputStream.skip(Long.MAX_VALUE);
+      if (bytesSkipped == 0) {
+        if (temp == null) {
+          temp = new byte[SKIP_BUFFER_SIZE];
+        }
+        int bytesRead = inputStream.read(temp, 0, SKIP_BUFFER_SIZE);
+        if (bytesRead < 0) {
+          // EOF
+          return;
+        }
+      }
+    }
+  }
+
   /** Invokes {@code Statement#setLargeMaxRows}, falling back on
    * {@link Statement#setMaxRows(int)} if the method does not exist (before
    * JDK 1.8) or throws {@link UnsupportedOperationException}. */
diff --git 
a/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java 
b/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java
index a1561aa..75adbab 100644
--- a/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java
+++ b/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java
@@ -23,8 +23,12 @@ import org.apache.calcite.avatica.util.ByteString;
 
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
@@ -32,9 +36,13 @@ import java.util.Locale;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.calcite.avatica.AvaticaUtils.skipFully;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -270,6 +278,31 @@ public class AvaticaUtilsTest {
     assertThat(s, is(s2));
   }
 
+  @Test public void testSkipFully() throws IOException {
+    InputStream in = of("");
+    assertEquals(0, in.available());
+    skipFully(in);
+    assertEquals(0, in.available());
+
+    in = of("asdf");
+    assertEquals(4, in.available());
+    skipFully(in);
+    assertEquals(0, in.available());
+
+    in = of("asdfasdf");
+    for (int i = 0; i < 4; i++) {
+      assertNotEquals(-1, in.read());
+    }
+    assertEquals(4, in.available());
+    skipFully(in);
+    assertEquals(0, in.available());
+  }
+
+  /** Returns an InputStream of UTF-8 encoded bytes from the provided string */
+  InputStream of(String str) {
+    return new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8));
+  }
+
   /** Dummy implementation of {@link ConnectionProperty}. */
   private static class ConnectionPropertyImpl implements ConnectionProperty {
     private final String name;
diff --git 
a/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java
 
b/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java
index 687c847..fa5372c 100644
--- 
a/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java
+++ 
b/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java
@@ -17,6 +17,7 @@
 package org.apache.calcite.avatica.server;
 
 import org.apache.calcite.avatica.AvaticaSeverity;
+import org.apache.calcite.avatica.AvaticaUtils;
 import org.apache.calcite.avatica.remote.AuthenticationType;
 import org.apache.calcite.avatica.remote.Service.ErrorResponse;
 
@@ -26,6 +27,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.util.Collections;
+import javax.servlet.ServletInputStream;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
@@ -57,8 +59,14 @@ public abstract class AbstractAvaticaHandler extends 
AbstractHandler
     // Make sure that we drop any unauthenticated users out first.
     if (null != serverConfig) {
       if (AuthenticationType.SPNEGO == serverConfig.getAuthenticationType()) {
+        // This is largely a failsafe. We should never normally get here, but
+        // AvaticaSpnegoAuthenticator should throw the HTTP/401.
         String remoteUser = request.getRemoteUser();
         if (null == remoteUser) {
+          ServletInputStream input = request.getInputStream();
+          if (request.getContentLengthLong() < 0) {
+            AvaticaUtils.skipFully(input);
+          }
           response.setStatus(HttpURLConnection.HTTP_UNAUTHORIZED);
           
response.getOutputStream().write(UNAUTHORIZED_ERROR.serialize().toByteArray());
           baseRequest.setHandled(true);
diff --git 
a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
 
b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
index 1bf6af9..16e27ff 100644
--- 
a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
+++ 
b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import javax.servlet.ServletException;
@@ -46,7 +47,7 @@ import javax.servlet.http.HttpServletResponse;
  * Jetty handler that executes Avatica JSON request-responses.
  */
 public class AvaticaProtobufHandler extends AbstractAvaticaHandler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(AvaticaJsonHandler.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(AvaticaProtobufHandler.class);
 
   private final Service service;
   private final ProtobufHandler pbHandler;
@@ -90,6 +91,14 @@ public class AvaticaProtobufHandler extends 
AbstractAvaticaHandler {
       HttpServletRequest request, HttpServletResponse response)
       throws IOException, ServletException {
     try (final Context ctx = this.requestTimer.start()) {
+      if (!request.getMethod().equals("POST")) {
+        response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+        response.getOutputStream().write(
+            "This server expects only POST 
calls.".getBytes(StandardCharsets.UTF_8));
+        baseRequest.setHandled(true);
+        return;
+      }
+
       // Check if the user is OK to proceed.
       if (!isUserPermitted(serverConfig, baseRequest, request, response)) {
         LOG.debug("HTTP request from {} is unauthenticated and authentication 
is required",
@@ -97,50 +106,48 @@ public class AvaticaProtobufHandler extends 
AbstractAvaticaHandler {
         return;
       }
 
+      final byte[] requestBytes;
+      // Avoid a new buffer creation for every HTTP request
+      final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
+      try (ServletInputStream inputStream = request.getInputStream()) {
+        requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer);
+      } finally {
+        buffer.reset();
+      }
+
       response.setContentType("application/octet-stream;charset=utf-8");
       response.setStatus(HttpServletResponse.SC_OK);
-      if (request.getMethod().equals("POST")) {
-        final byte[] requestBytes;
-        // Avoid a new buffer creation for every HTTP request
-        final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
-        try (ServletInputStream inputStream = request.getInputStream()) {
-          requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer);
-        } finally {
-          buffer.reset();
+      HandlerResponse<byte[]> handlerResponse;
+      try {
+        if (null != serverConfig && serverConfig.supportsImpersonation()) {
+          // If we can't extract a user, need to throw 401 in that case.
+          String remoteUser = 
serverConfig.getRemoteUserExtractor().extract(request);
+          // Invoke the ProtobufHandler inside as doAs for the remote user.
+          // The doAsRemoteUser call may disallow a user, need to throw 403 in 
that case.
+          handlerResponse = serverConfig.doAsRemoteUser(remoteUser,
+            request.getRemoteAddr(), new Callable<HandlerResponse<byte[]>>() {
+              @Override public HandlerResponse<byte[]> call() {
+                return pbHandler.apply(requestBytes);
+              }
+            });
+        } else {
+          handlerResponse = pbHandler.apply(requestBytes);
         }
-
-        HandlerResponse<byte[]> handlerResponse;
-        try {
-          if (null != serverConfig && serverConfig.supportsImpersonation()) {
-            // If we can't extract a user, need to throw 401 in that case.
-            String remoteUser = 
serverConfig.getRemoteUserExtractor().extract(request);
-            // Invoke the ProtobufHandler inside as doAs for the remote user.
-            // The doAsRemoteUser call may disallow a user, need to throw 403 
in that case.
-            handlerResponse = serverConfig.doAsRemoteUser(remoteUser,
-              request.getRemoteAddr(), new Callable<HandlerResponse<byte[]>>() 
{
-                @Override public HandlerResponse<byte[]> call() {
-                  return pbHandler.apply(requestBytes);
-                }
-              });
-          } else {
-            handlerResponse = pbHandler.apply(requestBytes);
-          }
-        } catch (RemoteUserExtractionException e) {
-          LOG.debug("Failed to extract remote user from request", e);
-          handlerResponse = pbHandler.unauthenticatedErrorResponse(e);
-        } catch (RemoteUserDisallowedException e) {
-          LOG.debug("Remote user is not authorized", e);
-          handlerResponse = pbHandler.unauthorizedErrorResponse(e);
-        } catch (Exception e) {
-          LOG.debug("Error invoking request from {}", 
baseRequest.getRemoteAddr(), e);
-          // Catch at the highest level of exceptions
-          handlerResponse = pbHandler.convertToErrorResponse(e);
-        }
-
-        baseRequest.setHandled(true);
-        response.setStatus(handlerResponse.getStatusCode());
-        response.getOutputStream().write(handlerResponse.getResponse());
+      } catch (RemoteUserExtractionException e) {
+        LOG.debug("Failed to extract remote user from request", e);
+        handlerResponse = pbHandler.unauthenticatedErrorResponse(e);
+      } catch (RemoteUserDisallowedException e) {
+        LOG.debug("Remote user is not authorized", e);
+        handlerResponse = pbHandler.unauthorizedErrorResponse(e);
+      } catch (Exception e) {
+        LOG.debug("Error invoking request from {}", 
baseRequest.getRemoteAddr(), e);
+        // Catch at the highest level of exceptions
+        handlerResponse = pbHandler.convertToErrorResponse(e);
       }
+
+      baseRequest.setHandled(true);
+      response.setStatus(handlerResponse.getStatusCode());
+      response.getOutputStream().write(handlerResponse.getResponse());
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java
 
b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java
index d801d4f..fcd9206 100644
--- 
a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java
+++ 
b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java
@@ -16,6 +16,8 @@
  */
 package org.apache.calcite.avatica.server;
 
+import org.apache.calcite.avatica.AvaticaUtils;
+
 import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.security.ServerAuthException;
 import org.eclipse.jetty.security.authentication.DeferredAuthentication;
@@ -73,6 +75,14 @@ public class AvaticaSpnegoAuthenticator extends
         res.sendError(HttpServletResponse.SC_UNAUTHORIZED);
         return Authentication.SEND_CONTINUE;
       }
+    } else if (computedAuth == Authentication.SEND_CONTINUE) {
+      // CALCITE-4196 When we need to reply back to the client with the 
HTTP/401 challenge
+      // we must make sure that we consume all of the data that the client has 
written. Otherwise,
+      // the client will continue to write the data on a socket which we've 
already closed. This
+      // would ultimately result in the client receiving a TCP Reset and 
seeing a "Broken pipe"
+      // exception in their client application.
+      HttpServletRequest req = (HttpServletRequest) request;
+      AvaticaUtils.skipFully(req.getInputStream());
     }
     return computedAuth;
   }
diff --git 
a/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java 
b/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java
index c33c6c2..0d1972f 100644
--- a/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java
+++ b/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java
@@ -251,7 +251,6 @@ public class AvaticaSpnegoTest extends HttpBaseTest {
       assertEquals(3, results.getInt(1));
     }
   }
-
 }
 
 // End AvaticaSpnegoTest.java
diff --git 
a/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java
 
b/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java
index 93c634e..f8e79ae 100644
--- 
a/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java
+++ 
b/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java
@@ -26,12 +26,15 @@ import org.junit.Test;
 
 import java.net.HttpURLConnection;
 import java.nio.charset.StandardCharsets;
+import javax.servlet.ServletInputStream;
 import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -59,9 +62,13 @@ public class AbstractAvaticaHandlerTest {
 
   @Test public void disallowUnauthenticatedUsers() throws Exception {
     ServletOutputStream os = mock(ServletOutputStream.class);
+    ServletInputStream is = mock(ServletInputStream.class);
+
+    when(is.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
 
     when(config.getAuthenticationType()).thenReturn(AuthenticationType.SPNEGO);
     when(request.getRemoteUser()).thenReturn(null);
+    when(request.getInputStream()).thenReturn(is);
     when(response.getOutputStream()).thenReturn(os);
 
     assertFalse(handler.isUserPermitted(config, baseRequest, request, 
response));
diff --git 
a/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java
 
b/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java
index acf814b..3005c68 100644
--- 
a/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java
+++ 
b/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java
@@ -24,10 +24,14 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import javax.servlet.ServletInputStream;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
@@ -40,17 +44,21 @@ public class AvaticaSpnegoAuthenticatorTest {
 
   private HttpServletRequest request;
   private HttpServletResponse response;
+  private ServletInputStream requestInput;
   private AvaticaSpnegoAuthenticator authenticator;
 
-  @Before public void setup() {
+  @Before public void setup() throws IOException {
     request = mock(HttpServletRequest.class);
+    requestInput = mock(ServletInputStream.class);
+    when(request.getInputStream()).thenReturn(requestInput);
     response = mock(HttpServletResponse.class);
     authenticator = new AvaticaSpnegoAuthenticator();
   }
 
   @Test public void testAuthenticatedDoesNothingExtra() throws IOException {
+    // SEND_CONTINUE not listed here for explicit testing below.
     List<Authentication> authsNotRequiringUpdate = 
Arrays.asList(Authentication.NOT_CHECKED,
-        Authentication.SEND_CONTINUE, Authentication.SEND_FAILURE, 
Authentication.SEND_SUCCESS);
+        Authentication.SEND_FAILURE, Authentication.SEND_SUCCESS);
     for (Authentication auth : authsNotRequiringUpdate) {
       assertEquals(auth, authenticator.sendChallengeIfNecessary(auth, request, 
response));
       verifyZeroInteractions(request);
@@ -67,6 +75,15 @@ public class AvaticaSpnegoAuthenticatorTest {
         HttpHeader.NEGOTIATE.asString());
     verify(response).sendError(HttpServletResponse.SC_UNAUTHORIZED);
   }
+
+  @Test public void testConsumeClientBufferOnChallenge() throws IOException {
+    when(requestInput.read(any(byte[].class), anyInt(), 
anyInt())).thenReturn(-1);
+    assertEquals(Authentication.SEND_CONTINUE,
+        authenticator.sendChallengeIfNecessary(Authentication.SEND_CONTINUE, 
request, response));
+    verify(request).getInputStream();
+    verify(requestInput).skip(anyLong());
+    verify(requestInput).read(any(byte[].class), anyInt(), anyInt());
+  }
 }
 
 // End AvaticaSpnegoAuthenticatorTest.java

Reply via email to