Repository: tez Updated Branches: refs/heads/TEZ-3334 ff02c00d7 -> e5d01a60a
TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e5d01a60 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e5d01a60 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e5d01a60 Branch: refs/heads/TEZ-3334 Commit: e5d01a60a040b7d60272a61cd540753bfe3478be Parents: ff02c00 Author: Jonathan Eagles <[email protected]> Authored: Thu Apr 6 13:06:31 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu Apr 6 13:06:31 2017 -0500 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../app/launcher/TezContainerLauncherImpl.java | 22 +++++++++------ .../tez/auxservices/TestShuffleHandlerJobs.java | 28 ++++++++------------ .../library/common/shuffle/InputHost.java | 2 +- .../common/shuffle/impl/ShuffleManager.java | 19 +++++++------ 5 files changed, 36 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e5d01a60/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index 3b44690..61473e0 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback TEZ-3683. LocalContainerLauncher#shouldDelete member variable is not used TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler TEZ-3628. Give Tez shuffle handler threads custom names http://git-wip-us.apache.org/repos/asf/tez/blob/e5d01a60/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java index 058abfe..f6a6874 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java @@ -177,14 +177,20 @@ public class TezContainerLauncherImpl extends ContainerLauncher { this.state = ContainerState.RUNNING; int shufflePort = TezRuntimeUtils.INVALID_PORT; - ByteBuffer portInfo = - response.getAllServicesMetaData().get( - conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, - TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); - if (portInfo != null) { - DataInputByteBuffer in = new DataInputByteBuffer(); - in.reset(portInfo); - shufflePort = in.readInt(); + Map<String, java.nio.ByteBuffer> servicesMetaData = response.getAllServicesMetaData(); + if (servicesMetaData != null) { + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + ByteBuffer portInfo = servicesMetaData.get(auxiliaryService); + if (portInfo != null) { + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(portInfo); + shufflePort = in.readInt(); + } else { + LOG.warn("Shuffle port for {} is not present is the services metadata response", auxiliaryService); + } + } else { + LOG.warn("Shuffle port cannot be found since services metadata response is missing"); } deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort); http://git-wip-us.apache.org/repos/asf/tez/blob/e5d01a60/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java index c409bf8..f719c13 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java @@ -20,17 +20,11 @@ package org.apache.tez.auxservices; import java.io.File; import java.io.IOException; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .GetApplicationReportRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.TezConfiguration; @@ -56,7 +50,7 @@ public class TestShuffleHandlerJobs { private static final Logger LOG = LoggerFactory.getLogger(TestShuffleHandlerJobs.class); - protected static MiniTezCluster mrrTezCluster; + protected static MiniTezCluster tezCluster; protected static MiniDFSCluster dfsCluster; private static Configuration conf = new Configuration(); @@ -81,8 +75,8 @@ public class TestShuffleHandlerJobs { return; } - if (mrrTezCluster == null) { - mrrTezCluster = new MiniTezCluster(TestShuffleHandlerJobs.class.getName(), NUM_NMS, + if (tezCluster == null) { + tezCluster = new MiniTezCluster(TestShuffleHandlerJobs.class.getName(), NUM_NMS, 1, 1); Configuration conf = new Configuration(); conf.set(YarnConfiguration.NM_AUX_SERVICES, @@ -94,17 +88,17 @@ public class TestShuffleHandlerJobs { conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l); - mrrTezCluster.init(conf); - mrrTezCluster.start(); + tezCluster.init(conf); + tezCluster.start(); } } @AfterClass public static void tearDown() { - if (mrrTezCluster != null) { - mrrTezCluster.stop(); - mrrTezCluster = null; + if (tezCluster != null) { + tezCluster.stop(); + tezCluster = null; } if (dfsCluster != null) { dfsCluster.shutdown(); @@ -123,7 +117,7 @@ public class TestShuffleHandlerJobs { String outputDirStr = "/tmp/owc-output/"; Path outputDir = new Path(outputDirStr); - TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + TezConfiguration tezConf = new TezConfiguration(tezCluster.getConfig()); tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); tezConf.set(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, ShuffleHandler.TEZ_SHUFFLE_SERVICEID); tezConf.setBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, true); @@ -138,7 +132,7 @@ public class TestShuffleHandlerJobs { inputDirStr, outputDirStr, "10"}, tezSession)==0); verifyOutput(outputDir, remoteFs); tezSession.stop(); - ClientRMService rmService = mrrTezCluster.getResourceManager().getClientRMService(); + ClientRMService rmService = tezCluster.getResourceManager().getClientRMService(); boolean isAppComplete = false; while(!isAppComplete) { GetApplicationReportResponse resp = rmService.getApplicationReport( @@ -158,7 +152,7 @@ public class TestShuffleHandlerJobs { Thread.sleep(100); } for(int i = 0; i < NUM_NMS; i++) { - String appPath = mrrTezCluster.getTestWorkDir() + "/" + this.getClass().getName() + String appPath = tezCluster.getTestWorkDir() + "/" + this.getClass().getName() + "-localDir-nm-" + i + "_0/usercache/" + UserGroupInformation.getCurrentUser().getUserName() + "/appcache/" + job.getAppId(); String dagPathStr = appPath + "/dag_1"; http://git-wip-us.apache.org/repos/asf/tez/blob/e5d01a60/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java index 88dacb9..6014b84 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java @@ -106,7 +106,7 @@ public class InputHost extends HostPort { inputs.add(srcAttempt); } - public synchronized PartitionToInputs clearAndGetOnePartition() { + public synchronized PartitionToInputs clearAndGetOnePartitionRange() { for (Map.Entry<PartitionRange, BlockingQueue<InputAttemptIdentifier>> entry : partitionToInputs.entrySet()) { List<InputAttemptIdentifier> inputs = http://git-wip-us.apache.org/repos/asf/tez/blob/e5d01a60/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index a23ce72..3436fc7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -63,7 +63,6 @@ import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputContext; @@ -431,11 +430,11 @@ public class ShuffleManager implements FetcherCallback { // Remove obsolete inputs from the list being given to the fetcher. Also // remove from the obsolete list. - PartitionToInputs pendingInputsOfOnePartition = inputHost - .clearAndGetOnePartition(); + PartitionToInputs pendingInputsOfOnePartitionRange = inputHost + .clearAndGetOnePartitionRange(); int includedMaps = 0; for (Iterator<InputAttemptIdentifier> inputIter = - pendingInputsOfOnePartition.getInputs().iterator(); + pendingInputsOfOnePartitionRange.getInputs().iterator(); inputIter.hasNext();) { InputAttemptIdentifier input = inputIter.next(); @@ -467,8 +466,8 @@ public class ShuffleManager implements FetcherCallback { if (includedMaps >= maxTaskOutputAtOnce) { inputIter.remove(); //add to inputHost - inputHost.addKnownInput(pendingInputsOfOnePartition.getPartition(), pendingInputsOfOnePartition.getPartitionCount(), - input); + inputHost.addKnownInput(pendingInputsOfOnePartitionRange.getPartition(), + pendingInputsOfOnePartitionRange.getPartitionCount(), input); } else { includedMaps++; } @@ -477,13 +476,13 @@ public class ShuffleManager implements FetcherCallback { pendingHosts.add(inputHost); //add it to queue } fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), - pendingInputsOfOnePartition.getPartition(), - pendingInputsOfOnePartition.getPartitionCount(), - pendingInputsOfOnePartition.getInputs()); + pendingInputsOfOnePartitionRange.getPartition(), + pendingInputsOfOnePartitionRange.getPartitionCount(), + pendingInputsOfOnePartitionRange.getInputs()); if (LOG.isDebugEnabled()) { LOG.debug("Created Fetcher for host: " + inputHost.getHost() + ", info: " + inputHost.getAdditionalInfo() - + ", with inputs: " + pendingInputsOfOnePartition); + + ", with inputs: " + pendingInputsOfOnePartitionRange); } return fetcherBuilder.build(); }
