Repository: tez Updated Branches: refs/heads/master e72b0a23a -> bf87a0fd1
TEZ-3902. Upgrade to netty-3.10.5.Final.jar (Jason Lowe via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bf87a0fd Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bf87a0fd Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bf87a0fd Branch: refs/heads/master Commit: bf87a0fd1fc5fc2de907fc81f99ea26f18881593 Parents: e72b0a2 Author: Jonathan Eagles <[email protected]> Authored: Tue May 22 17:03:55 2018 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Tue May 22 17:03:55 2018 -0500 ---------------------------------------------------------------------- pom.xml | 4 +-- tez-dist/src/main/assembly/tez-dist-minimal.xml | 1 + tez-ext-service-tests/pom.xml | 1 - .../tez/shufflehandler/ShuffleHandler.java | 26 ++++++++++---------- .../apache/tez/auxservices/ShuffleHandler.java | 26 ++++++++++---------- .../tez/auxservices/TestShuffleHandler.java | 2 ++ .../http/async/netty/AsyncHttpConnection.java | 14 +++++------ 7 files changed, 37 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/bf87a0fd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1c8caeb..917edb8 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,7 @@ <clover.license>${user.home}/clover.license</clover.license> <hadoop.version>2.7.2</hadoop.version> <jetty.version>9.3.22.v20171030</jetty.version> - <netty.version>3.6.2.Final</netty.version> + <netty.version>3.10.5.Final</netty.version> <pig.version>0.13.0</pig.version> <javac.version>1.8</javac.version> <slf4j.version>1.7.10</slf4j.version> @@ -221,7 +221,7 @@ <dependency> <groupId>com.ning</groupId> <artifactId>async-http-client</artifactId> - <version>1.8.16</version> + <version>1.9.40</version> </dependency> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/tez/blob/bf87a0fd/tez-dist/src/main/assembly/tez-dist-minimal.xml ---------------------------------------------------------------------- diff --git a/tez-dist/src/main/assembly/tez-dist-minimal.xml b/tez-dist/src/main/assembly/tez-dist-minimal.xml index 80633ff..fbd1782 100644 --- a/tez-dist/src/main/assembly/tez-dist-minimal.xml +++ b/tez-dist/src/main/assembly/tez-dist-minimal.xml @@ -24,6 +24,7 @@ <useAllReactorProjects>true</useAllReactorProjects> <excludes> <exclude>org.apache.tez:tez-aux-services</exclude> + <exclude>org.apache.tez:tez-ext-service-tests</exclude> </excludes> <binaries> <outputDirectory>/</outputDirectory> http://git-wip-us.apache.org/repos/asf/tez/blob/bf87a0fd/tez-ext-service-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml index d6d8573..e123a7a 100644 --- a/tez-ext-service-tests/pom.xml +++ b/tez-ext-service-tests/pom.xml @@ -29,7 +29,6 @@ <dependency> <groupId>io.netty</groupId> <artifactId>netty</artifactId> - <version>3.6.2.Final</version> </dependency> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/tez/blob/bf87a0fd/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java index ebaf9fe..47ac900 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java @@ -367,9 +367,9 @@ public class ShuffleHandler { } // Check whether the shuffle version is compatible if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( - request.getHeader(ShuffleHeader.HTTP_HEADER_NAME)) + request.headers().get(ShuffleHeader.HTTP_HEADER_NAME)) || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( - request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) { + request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) { sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST); } final Map<String,List<String>> q = @@ -551,12 +551,12 @@ public class ShuffleHandler { boolean keepAliveParam, long contentLength) { if (!connectionKeepAliveEnabled && !keepAliveParam) { LOG.info("Setting connection close header..."); - response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE); + response.headers().set(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE); } else { - response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength)); - response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + response.headers().set(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut); LOG.info("Content Length in shuffle : " + contentLength); } @@ -584,7 +584,7 @@ public class ShuffleHandler { String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri); // hash from the fetcher String urlHashStr = - request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH); + request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH); if (urlHashStr == null) { LOG.info("Missing header hash for " + appid); throw new IOException("fetcher cannot be authenticated"); @@ -600,11 +600,11 @@ public class ShuffleHandler { String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8), tokenSecret); - response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); + response.headers().set(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); // Put shuffle version into http header - response.setHeader(ShuffleHeader.HTTP_HEADER_NAME, + response.headers().set(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); - response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, + response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); if (LOG.isDebugEnabled()) { int len = reply.length(); @@ -654,11 +654,11 @@ public class ShuffleHandler { protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); // Put shuffle version into http header - response.setHeader(ShuffleHeader.HTTP_HEADER_NAME, + response.headers().set(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); - response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, + response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); response.setContent( ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); http://git-wip-us.apache.org/repos/asf/tez/blob/bf87a0fd/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index d48cc01..e22928e 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -992,9 +992,9 @@ public class ShuffleHandler extends AuxiliaryService { } // Check whether the shuffle version is compatible if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( - request.getHeader(ShuffleHeader.HTTP_HEADER_NAME)) + request.headers().get(ShuffleHeader.HTTP_HEADER_NAME)) || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( - request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) { + request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) { sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST); } final Map<String,List<String>> q = @@ -1280,9 +1280,9 @@ public class ShuffleHandler extends AuxiliaryService { protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, long contentLength) { if (connectionKeepAliveEnabled || keepAliveParam) { - response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength)); - response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength)); + response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + response.headers().set(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut); if (LOG.isDebugEnabled()) { LOG.debug("Content Length in shuffle : " + contentLength); } @@ -1290,7 +1290,7 @@ public class ShuffleHandler extends AuxiliaryService { if (LOG.isDebugEnabled()) { LOG.debug("Setting connection close header..."); } - response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE); + response.headers().set(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE); } } @@ -1316,7 +1316,7 @@ public class ShuffleHandler extends AuxiliaryService { String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri); // hash from the fetcher String urlHashStr = - request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH); + request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH); if (urlHashStr == null) { LOG.info("Missing header hash for " + appid); throw new IOException("fetcher cannot be authenticated"); @@ -1332,11 +1332,11 @@ public class ShuffleHandler extends AuxiliaryService { String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8), tokenSecret); - response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); + response.headers().set(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); // Put shuffle version into http header - response.setHeader(ShuffleHeader.HTTP_HEADER_NAME, + response.headers().set(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); - response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, + response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); if (LOG.isDebugEnabled()) { int len = reply.length(); @@ -1420,11 +1420,11 @@ public class ShuffleHandler extends AuxiliaryService { protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); // Put shuffle version into http header - response.setHeader(ShuffleHeader.HTTP_HEADER_NAME, + response.headers().set(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); - response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, + response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); response.setContent( ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); http://git-wip-us.apache.org/repos/asf/tez/blob/bf87a0fd/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index b9fd0d2..11c92fb 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -82,6 +82,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.AbstractChannel; +import org.jboss.netty.handler.codec.http.DefaultHttpHeaders; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpRequest; @@ -1236,6 +1237,7 @@ public class TestShuffleHandler { public HttpRequest createMockHttpRequest() { HttpRequest mockHttpRequest = mock(HttpRequest.class); Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod(); + Mockito.doReturn(new DefaultHttpHeaders()).when(mockHttpRequest).headers(); Mockito.doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { http://git-wip-us.apache.org/repos/asf/tez/blob/bf87a0fd/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java index 735bb46..9243e97 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java @@ -31,7 +31,6 @@ import org.apache.tez.http.BaseHttpConnection; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.http.SSLFactory; import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.security.SecureShuffleUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader; import org.apache.tez.util.StopWatch; @@ -92,15 +91,14 @@ public class AsyncHttpConnection extends BaseHttpConnection { * setMaxConnections & addRequestFilter. */ builder - .setAllowPoolingConnection(httpConnParams.isKeepAlive()) - .setAllowSslConnectionPool(httpConnParams.isKeepAlive()) - .setCompressionEnabled(false) + .setAllowPoolingConnections(httpConnParams.isKeepAlive()) + .setAllowPoolingSslConnections(httpConnParams.isKeepAlive()) + .setCompressionEnforced(false) //.setExecutorService(applicationThreadPool) //.addRequestFilter(new ThrottleRequestFilter()) - .setMaximumConnectionsPerHost(1) - .setConnectionTimeoutInMs(httpConnParams.getConnectionTimeout()) - .setRequestTimeoutInMs(httpConnParams.getReadTimeout()) - .setUseRawUrl(true) + .setMaxConnectionsPerHost(1) + .setConnectTimeout(httpConnParams.getConnectionTimeout()) + .setDisableUrlEncodingForBoundedRequests(true) .build(); httpAsyncClient = new AsyncHttpClient(builder.build()); }
