Repository: tez Updated Branches: refs/heads/master 9a3d8898b -> 6098f1bb9
TEZ-2748. Fix master build against hadoop-2.2. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/de751ec3 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/de751ec3 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/de751ec3 Branch: refs/heads/master Commit: de751ec3da9d29fafefa78039450cfffc4fd4d80 Parents: 9a3d889 Author: Siddharth Seth <[email protected]> Authored: Wed Aug 26 15:44:10 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Aug 26 15:44:10 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/shufflehandler/ShuffleHandler.java | 107 ++++--------------- tez-tools/analyzers/pom.xml | 26 ++++- 3 files changed, 42 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/de751ec3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 42e7290..f91df1b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,7 @@ INCOMPATIBLE CHANGES TEZ-2468. Change the minimum Java version to Java 7. ALL CHANGES: + TEZ-2748. Fix master build against hadoop-2.2. TEZ-2743. Fix TezContainerLauncher logging tokens. TEZ-2708. Rename classes and variables post TEZ-2003 changes. TEZ-2740. Create a reconfigureVertex alias for deprecated http://git-wip-us.apache.org/repos/asf/tez/blob/de751ec3/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java index 046ce18..ebaf9fe 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java @@ -54,17 +54,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.SecureIOUtils; -import org.apache.hadoop.mapred.FadvisedChunkedFile; -import org.apache.hadoop.mapred.FadvisedFileRegion; -import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.mapreduce.hadoop.MRConfig; import org.apache.tez.runtime.library.common.security.SecureShuffleUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; @@ -79,6 +75,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.DefaultFileRegion; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; @@ -95,7 +92,6 @@ import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseEncoder; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.QueryStringDecoder; -import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.CharsetUtil; import org.slf4j.Logger; @@ -107,12 +103,6 @@ public class ShuffleHandler { public static final String SHUFFLE_HANDLER_LOCAL_DIRS = "tez.shuffle.handler.local-dirs"; - public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache"; - public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true; - - public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes"; - public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; - // pattern to identify errors related to the client closing the socket early // idea borrowed from Netty SslHandler private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile( @@ -123,7 +113,6 @@ public class ShuffleHandler { private final ChannelFactory selector; private final ChannelGroup accepted = new DefaultChannelGroup(); protected HttpPipelineFactory pipelineFact; - private final int sslFileBufferSize; private final Configuration conf; private final ConcurrentMap<String, Boolean> registeredApps = new ConcurrentHashMap<String, Boolean>(); @@ -132,20 +121,11 @@ public class ShuffleHandler { * Should the shuffle use posix_fadvise calls to manage the OS cache during * sendfile */ - private final boolean manageOsCache; - private final int readaheadLength; private final int maxShuffleConnections; - private final int shuffleBufferSize; - private final boolean shuffleTransferToAllowed; - private final ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); private Map<String,String> userRsrc; private JobTokenSecretManager secretManager; - // TODO Fix this for tez. - public static final String MAPREDUCE_SHUFFLE_SERVICEID = - "mapreduce_shuffle"; - public static final String SHUFFLE_PORT_CONFIG_KEY = "tez.shuffle.port"; public static final int DEFAULT_SHUFFLE_PORT = 15551; @@ -165,11 +145,6 @@ public class ShuffleHandler { public static final String CONNECTION_CLOSE = "close"; - public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = - "mapreduce.shuffle.ssl.file.buffer.size"; - - public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; - public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections"; public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit @@ -177,16 +152,6 @@ public class ShuffleHandler { // 0 implies Netty default of 2 * number of available processors public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0; - public static final String SHUFFLE_BUFFER_SIZE = - "mapreduce.shuffle.transfer.buffer.size"; - public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024; - - public static final String SHUFFLE_TRANSFERTO_ALLOWED = - "mapreduce.shuffle.transferTo.allowed"; - public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true; - public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = - false; - final boolean connectionKeepAliveEnabled; final int connectionKeepAliveTimeOut; final int mapOutputMetaInfoCacheSize; @@ -197,11 +162,6 @@ public class ShuffleHandler { public ShuffleHandler(Configuration conf) { this.conf = conf; - manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, - DEFAULT_SHUFFLE_MANAGE_OS_CACHE); - - readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, - DEFAULT_SHUFFLE_READAHEAD_BYTES); maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS, DEFAULT_MAX_SHUFFLE_CONNECTIONS); @@ -211,13 +171,6 @@ public class ShuffleHandler { maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors(); } - shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE, - DEFAULT_SHUFFLE_BUFFER_SIZE); - - shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED, - (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED: - DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED); - ThreadFactory bossFactory = new ThreadFactoryBuilder() .setNameFormat("ShuffleHandler Netty Boss #%d") .build(); @@ -230,8 +183,6 @@ public class ShuffleHandler { Executors.newCachedThreadPool(workerFactory), maxShuffleThreads); - sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, - DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); connectionKeepAliveEnabled = conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED); @@ -332,31 +283,23 @@ public class ShuffleHandler { class HttpPipelineFactory implements ChannelPipelineFactory { final Shuffle SHUFFLE; - private SSLFactory sslFactory; public HttpPipelineFactory(Configuration conf) throws Exception { SHUFFLE = getShuffle(conf); // TODO Setup SSL Shuffle -// if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, -// MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { -// LOG.info("Encrypted shuffle is enabled."); -// sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); -// sslFactory.init(); -// } + if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, + MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { + throw new UnsupportedOperationException( + "SSL Shuffle is not currently supported for the test shuffle handler"); + } } public void destroy() { - if (sslFactory != null) { - sslFactory.destroy(); - } } @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); - if (sslFactory != null) { - pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); - } pipeline.addLast("decoder", new HttpRequestDecoder()); pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); pipeline.addLast("encoder", new HttpResponseEncoder()); @@ -689,31 +632,17 @@ public class ShuffleHandler { return null; } ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) == null) { - final FadvisedFileRegion partition = new FadvisedFileRegion(spill, - info.getStartOffset(), info.getPartLength(), manageOsCache, readaheadLength, - readaheadPool, spillfile.getAbsolutePath(), - shuffleBufferSize, shuffleTransferToAllowed); - writeFuture = ch.write(partition); - writeFuture.addListener(new ChannelFutureListener() { - // TODO error handling; distinguish IO/connection failures, - // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - if (future.isSuccess()) { - partition.transferSuccessful(); - } - partition.releaseExternalResources(); - } - }); - } else { - // HTTPS cannot be done with zero copy. - final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, - info.getStartOffset(), info.getPartLength(), sslFileBufferSize, - manageOsCache, readaheadLength, readaheadPool, - spillfile.getAbsolutePath()); - writeFuture = ch.write(chunk); - } + final DefaultFileRegion partition = + new DefaultFileRegion(spill.getChannel(), info.getStartOffset(), info.getPartLength()); + writeFuture = ch.write(partition); + writeFuture.addListener(new ChannelFutureListener() { + // TODO error handling; distinguish IO/connection failures, + // attribute to appropriate spill output + @Override + public void operationComplete(ChannelFuture future) { + partition.releaseExternalResources(); + } + }); return writeFuture; } http://git-wip-us.apache.org/repos/asf/tez/blob/de751ec3/tez-tools/analyzers/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/pom.xml b/tez-tools/analyzers/pom.xml index aba8e74..bb5d896 100644 --- a/tez-tools/analyzers/pom.xml +++ b/tez-tools/analyzers/pom.xml @@ -25,7 +25,27 @@ <artifactId>tez-perf-analyzer</artifactId> <packaging>pom</packaging> - <modules> - <module>job-analyzer</module> - </modules> + <profiles> + <profile> + <id>hadoop24</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <modules> + <module>job-analyzer</module> + </modules> + </profile> + <profile> + <id>hadoop26</id> + <activation> + <property> + <name>!skipATS</name> + </property> + </activation> + <modules> + <module>job-analyzer</module> + </modules> + </profile> + </profiles> + </project>
