This is an automated email from the ASF dual-hosted git repository. elserj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/calcite-avatica.git
The following commit(s) were added to refs/heads/master by this push: new f983742 [CALCITE-4196] Consume all data from client before replying with HTTP/401 f983742 is described below commit f9837420cfabf88874eeb2c0a5b9642ebe2c2461 Author: Josh Elser <els...@apache.org> AuthorDate: Thu Aug 27 13:56:23 2020 -0400 [CALCITE-4196] Consume all data from client before replying with HTTP/401 SPNEGO's handshake involves sending an HTTP/401 to "challenge" the client to reply with authentication data. If the client is sending a significant amount of data in the original request, the client will still be writing this data when the server replies. This causes the client to receive a TCP Reset when it continues to write data, and ultimately manifests in a "Broken Pipe" runtime exception. The fix is to simply consume all data the client wrote prior to responding with the HTTP/401. Closes #127 Signed-off-by: Kevin Risden <kris...@apache.org> --- .../org/apache/calcite/avatica/AvaticaUtils.java | 22 ++++++ .../calcite/avatica/test/AvaticaUtilsTest.java | 33 ++++++++ .../avatica/server/AbstractAvaticaHandler.java | 8 ++ .../avatica/server/AvaticaProtobufHandler.java | 89 ++++++++++++---------- .../avatica/server/AvaticaSpnegoAuthenticator.java | 10 +++ .../apache/calcite/avatica/AvaticaSpnegoTest.java | 1 - .../avatica/server/AbstractAvaticaHandlerTest.java | 7 ++ .../server/AvaticaSpnegoAuthenticatorTest.java | 21 ++++- 8 files changed, 147 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java b/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java index 45e8236..18b22dd 100644 --- a/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java +++ b/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java @@ -57,6 +57,8 @@ public class AvaticaUtils { } }; + private static final int SKIP_BUFFER_SIZE = 4096; + private AvaticaUtils() {} static { @@ -286,6 +288,26 @@ public class AvaticaUtils { return buffer.toArray(); } + /** + * Reads and discards all data available on the InputStream. + */ + public static void skipFully(InputStream inputStream) throws IOException { + byte[] temp = null; + while (true) { + long bytesSkipped = inputStream.skip(Long.MAX_VALUE); + if (bytesSkipped == 0) { + if (temp == null) { + temp = new byte[SKIP_BUFFER_SIZE]; + } + int bytesRead = inputStream.read(temp, 0, SKIP_BUFFER_SIZE); + if (bytesRead < 0) { + // EOF + return; + } + } + } + } + /** Invokes {@code Statement#setLargeMaxRows}, falling back on * {@link Statement#setMaxRows(int)} if the method does not exist (before * JDK 1.8) or throws {@link UnsupportedOperationException}. */ diff --git a/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java b/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java index a1561aa..75adbab 100644 --- a/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java +++ b/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java @@ -23,8 +23,12 @@ import org.apache.calcite.avatica.util.ByteString; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashSet; @@ -32,9 +36,13 @@ import java.util.Locale; import java.util.Properties; import java.util.Set; +import static org.apache.calcite.avatica.AvaticaUtils.skipFully; + import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -270,6 +278,31 @@ public class AvaticaUtilsTest { assertThat(s, is(s2)); } + @Test public void testSkipFully() throws IOException { + InputStream in = of(""); + assertEquals(0, in.available()); + skipFully(in); + assertEquals(0, in.available()); + + in = of("asdf"); + assertEquals(4, in.available()); + skipFully(in); + assertEquals(0, in.available()); + + in = of("asdfasdf"); + for (int i = 0; i < 4; i++) { + assertNotEquals(-1, in.read()); + } + assertEquals(4, in.available()); + skipFully(in); + assertEquals(0, in.available()); + } + + /** Returns an InputStream of UTF-8 encoded bytes from the provided string */ + InputStream of(String str) { + return new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)); + } + /** Dummy implementation of {@link ConnectionProperty}. */ private static class ConnectionPropertyImpl implements ConnectionProperty { private final String name; diff --git a/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java b/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java index 687c847..fa5372c 100644 --- a/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java +++ b/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java @@ -17,6 +17,7 @@ package org.apache.calcite.avatica.server; import org.apache.calcite.avatica.AvaticaSeverity; +import org.apache.calcite.avatica.AvaticaUtils; import org.apache.calcite.avatica.remote.AuthenticationType; import org.apache.calcite.avatica.remote.Service.ErrorResponse; @@ -26,6 +27,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import java.io.IOException; import java.net.HttpURLConnection; import java.util.Collections; +import javax.servlet.ServletInputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -57,8 +59,14 @@ public abstract class AbstractAvaticaHandler extends AbstractHandler // Make sure that we drop any unauthenticated users out first. if (null != serverConfig) { if (AuthenticationType.SPNEGO == serverConfig.getAuthenticationType()) { + // This is largely a failsafe. We should never normally get here, but + // AvaticaSpnegoAuthenticator should throw the HTTP/401. String remoteUser = request.getRemoteUser(); if (null == remoteUser) { + ServletInputStream input = request.getInputStream(); + if (request.getContentLengthLong() < 0) { + AvaticaUtils.skipFully(input); + } response.setStatus(HttpURLConnection.HTTP_UNAUTHORIZED); response.getOutputStream().write(UNAUTHORIZED_ERROR.serialize().toByteArray()); baseRequest.setHandled(true); diff --git a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java index 1bf6af9..16e27ff 100644 --- a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java +++ b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.concurrent.Callable; import javax.servlet.ServletException; @@ -46,7 +47,7 @@ import javax.servlet.http.HttpServletResponse; * Jetty handler that executes Avatica JSON request-responses. */ public class AvaticaProtobufHandler extends AbstractAvaticaHandler { - private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(AvaticaProtobufHandler.class); private final Service service; private final ProtobufHandler pbHandler; @@ -90,6 +91,14 @@ public class AvaticaProtobufHandler extends AbstractAvaticaHandler { HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { try (final Context ctx = this.requestTimer.start()) { + if (!request.getMethod().equals("POST")) { + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + response.getOutputStream().write( + "This server expects only POST calls.".getBytes(StandardCharsets.UTF_8)); + baseRequest.setHandled(true); + return; + } + // Check if the user is OK to proceed. if (!isUserPermitted(serverConfig, baseRequest, request, response)) { LOG.debug("HTTP request from {} is unauthenticated and authentication is required", @@ -97,50 +106,48 @@ public class AvaticaProtobufHandler extends AbstractAvaticaHandler { return; } + final byte[] requestBytes; + // Avoid a new buffer creation for every HTTP request + final UnsynchronizedBuffer buffer = threadLocalBuffer.get(); + try (ServletInputStream inputStream = request.getInputStream()) { + requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer); + } finally { + buffer.reset(); + } + response.setContentType("application/octet-stream;charset=utf-8"); response.setStatus(HttpServletResponse.SC_OK); - if (request.getMethod().equals("POST")) { - final byte[] requestBytes; - // Avoid a new buffer creation for every HTTP request - final UnsynchronizedBuffer buffer = threadLocalBuffer.get(); - try (ServletInputStream inputStream = request.getInputStream()) { - requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer); - } finally { - buffer.reset(); + HandlerResponse<byte[]> handlerResponse; + try { + if (null != serverConfig && serverConfig.supportsImpersonation()) { + // If we can't extract a user, need to throw 401 in that case. + String remoteUser = serverConfig.getRemoteUserExtractor().extract(request); + // Invoke the ProtobufHandler inside as doAs for the remote user. + // The doAsRemoteUser call may disallow a user, need to throw 403 in that case. + handlerResponse = serverConfig.doAsRemoteUser(remoteUser, + request.getRemoteAddr(), new Callable<HandlerResponse<byte[]>>() { + @Override public HandlerResponse<byte[]> call() { + return pbHandler.apply(requestBytes); + } + }); + } else { + handlerResponse = pbHandler.apply(requestBytes); } - - HandlerResponse<byte[]> handlerResponse; - try { - if (null != serverConfig && serverConfig.supportsImpersonation()) { - // If we can't extract a user, need to throw 401 in that case. - String remoteUser = serverConfig.getRemoteUserExtractor().extract(request); - // Invoke the ProtobufHandler inside as doAs for the remote user. - // The doAsRemoteUser call may disallow a user, need to throw 403 in that case. - handlerResponse = serverConfig.doAsRemoteUser(remoteUser, - request.getRemoteAddr(), new Callable<HandlerResponse<byte[]>>() { - @Override public HandlerResponse<byte[]> call() { - return pbHandler.apply(requestBytes); - } - }); - } else { - handlerResponse = pbHandler.apply(requestBytes); - } - } catch (RemoteUserExtractionException e) { - LOG.debug("Failed to extract remote user from request", e); - handlerResponse = pbHandler.unauthenticatedErrorResponse(e); - } catch (RemoteUserDisallowedException e) { - LOG.debug("Remote user is not authorized", e); - handlerResponse = pbHandler.unauthorizedErrorResponse(e); - } catch (Exception e) { - LOG.debug("Error invoking request from {}", baseRequest.getRemoteAddr(), e); - // Catch at the highest level of exceptions - handlerResponse = pbHandler.convertToErrorResponse(e); - } - - baseRequest.setHandled(true); - response.setStatus(handlerResponse.getStatusCode()); - response.getOutputStream().write(handlerResponse.getResponse()); + } catch (RemoteUserExtractionException e) { + LOG.debug("Failed to extract remote user from request", e); + handlerResponse = pbHandler.unauthenticatedErrorResponse(e); + } catch (RemoteUserDisallowedException e) { + LOG.debug("Remote user is not authorized", e); + handlerResponse = pbHandler.unauthorizedErrorResponse(e); + } catch (Exception e) { + LOG.debug("Error invoking request from {}", baseRequest.getRemoteAddr(), e); + // Catch at the highest level of exceptions + handlerResponse = pbHandler.convertToErrorResponse(e); } + + baseRequest.setHandled(true); + response.setStatus(handlerResponse.getStatusCode()); + response.getOutputStream().write(handlerResponse.getResponse()); } } diff --git a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java index d801d4f..fcd9206 100644 --- a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java +++ b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java @@ -16,6 +16,8 @@ */ package org.apache.calcite.avatica.server; +import org.apache.calcite.avatica.AvaticaUtils; + import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.security.ServerAuthException; import org.eclipse.jetty.security.authentication.DeferredAuthentication; @@ -73,6 +75,14 @@ public class AvaticaSpnegoAuthenticator extends res.sendError(HttpServletResponse.SC_UNAUTHORIZED); return Authentication.SEND_CONTINUE; } + } else if (computedAuth == Authentication.SEND_CONTINUE) { + // CALCITE-4196 When we need to reply back to the client with the HTTP/401 challenge + // we must make sure that we consume all of the data that the client has written. Otherwise, + // the client will continue to write the data on a socket which we've already closed. This + // would ultimately result in the client receiving a TCP Reset and seeing a "Broken pipe" + // exception in their client application. + HttpServletRequest req = (HttpServletRequest) request; + AvaticaUtils.skipFully(req.getInputStream()); } return computedAuth; } diff --git a/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java b/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java index c33c6c2..0d1972f 100644 --- a/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java +++ b/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java @@ -251,7 +251,6 @@ public class AvaticaSpnegoTest extends HttpBaseTest { assertEquals(3, results.getInt(1)); } } - } // End AvaticaSpnegoTest.java diff --git a/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java b/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java index 93c634e..f8e79ae 100644 --- a/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java +++ b/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java @@ -26,12 +26,15 @@ import org.junit.Test; import java.net.HttpURLConnection; import java.nio.charset.StandardCharsets; +import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -59,9 +62,13 @@ public class AbstractAvaticaHandlerTest { @Test public void disallowUnauthenticatedUsers() throws Exception { ServletOutputStream os = mock(ServletOutputStream.class); + ServletInputStream is = mock(ServletInputStream.class); + + when(is.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); when(config.getAuthenticationType()).thenReturn(AuthenticationType.SPNEGO); when(request.getRemoteUser()).thenReturn(null); + when(request.getInputStream()).thenReturn(is); when(response.getOutputStream()).thenReturn(os); assertFalse(handler.isUserPermitted(config, baseRequest, request, response)); diff --git a/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java b/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java index acf814b..3005c68 100644 --- a/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java +++ b/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java @@ -24,10 +24,14 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; import java.util.List; +import javax.servlet.ServletInputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; @@ -40,17 +44,21 @@ public class AvaticaSpnegoAuthenticatorTest { private HttpServletRequest request; private HttpServletResponse response; + private ServletInputStream requestInput; private AvaticaSpnegoAuthenticator authenticator; - @Before public void setup() { + @Before public void setup() throws IOException { request = mock(HttpServletRequest.class); + requestInput = mock(ServletInputStream.class); + when(request.getInputStream()).thenReturn(requestInput); response = mock(HttpServletResponse.class); authenticator = new AvaticaSpnegoAuthenticator(); } @Test public void testAuthenticatedDoesNothingExtra() throws IOException { + // SEND_CONTINUE not listed here for explicit testing below. List<Authentication> authsNotRequiringUpdate = Arrays.asList(Authentication.NOT_CHECKED, - Authentication.SEND_CONTINUE, Authentication.SEND_FAILURE, Authentication.SEND_SUCCESS); + Authentication.SEND_FAILURE, Authentication.SEND_SUCCESS); for (Authentication auth : authsNotRequiringUpdate) { assertEquals(auth, authenticator.sendChallengeIfNecessary(auth, request, response)); verifyZeroInteractions(request); @@ -67,6 +75,15 @@ public class AvaticaSpnegoAuthenticatorTest { HttpHeader.NEGOTIATE.asString()); verify(response).sendError(HttpServletResponse.SC_UNAUTHORIZED); } + + @Test public void testConsumeClientBufferOnChallenge() throws IOException { + when(requestInput.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); + assertEquals(Authentication.SEND_CONTINUE, + authenticator.sendChallengeIfNecessary(Authentication.SEND_CONTINUE, request, response)); + verify(request).getInputStream(); + verify(requestInput).skip(anyLong()); + verify(requestInput).read(any(byte[].class), anyInt(), anyInt()); + } } // End AvaticaSpnegoAuthenticatorTest.java