Repository: tez Updated Branches: refs/heads/TEZ-2003 ded95e59e -> da0f93872
TEZ-2714. Fix comments from review - part 3. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/da0f9387 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/da0f9387 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/da0f9387 Branch: refs/heads/TEZ-2003 Commit: da0f93872f0c6f233a48f16c978ba76b44523001 Parents: ded95e5 Author: Siddharth Seth <[email protected]> Authored: Fri Aug 14 14:20:13 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 14 14:20:13 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../api/ServicePluginsDescriptor.java | 20 +++ .../dag/app/rm/TaskSchedulerEventHandler.java | 5 +- .../tez/dag/app/rm/container/AMContainer.java | 1 - .../apache/tez/dag/app/MockDAGAppMaster.java | 20 ++- .../app/rm/TestTaskSchedulerEventHandler.java | 2 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 5 +- .../tez/shufflehandler/FadvisedChunkedFile.java | 78 --------- .../tez/shufflehandler/FadvisedFileRegion.java | 160 ------------------- .../apache/tez/shufflehandler/IndexCache.java | 12 +- .../tez/shufflehandler/ShuffleHandler.java | 80 +--------- 11 files changed, 45 insertions(+), 339 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 8a8e257..fed203a 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -49,5 +49,6 @@ ALL CHANGES: TEZ-2707. Fix comments from reviews - part 2. TEZ-2713. Add tests for node handling when there's multiple schedulers. TEZ-2721. rebase 08/14 + TEZ-2714. Fix comments from review - part 3. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java index 113b7db..2dabed0 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java @@ -19,6 +19,7 @@ import java.util.Arrays; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tez.dag.api.TezConfiguration; /** * An {@link ServicePluginsDescriptor} describes the list of plugins running within the AM for @@ -71,6 +72,13 @@ public class ServicePluginsDescriptor { /** * Create a service plugin descriptor with the provided plugins. Also allows specification of whether * in-AM execution is enabled. Container execution is enabled by default. + * + * Note on Uber mode: This is NOT fully supported at the moment. Tasks will be launched within the + * AM process itself, controlled by {@link TezConfiguration#TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS}. + * The AM will need to be sized correctly for the tasks. Memory allocation to the running task + * cannot be controlled yet, and is the full AM heap for each task. + * TODO: TEZ-2722 + * * @param enableUber whether to enable execution in the AM or not * @param taskSchedulerDescriptor the task scheduler plugin descriptors * @param containerLauncherDescriptors the container launcher plugin descriptors @@ -89,6 +97,12 @@ public class ServicePluginsDescriptor { * Create a service plugin descriptor with the provided plugins. Also allows specification of whether * container execution and in-AM execution will be enabled. * + * Note on Uber mode: This is NOT fully supported at the moment. Tasks will be launched within the + * AM process itself, controlled by {@link TezConfiguration#TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS}. + * The AM will need to be sized correctly for the tasks. Memory allocation to the running task + * cannot be controlled yet, and is the full AM heap for each task. + * TODO: TEZ-2722 + * * @param enableContainers whether to enable execution in containers * @param enableUber whether to enable execution in the AM or not * @param taskSchedulerDescriptor the task scheduler plugin descriptors @@ -108,6 +122,12 @@ public class ServicePluginsDescriptor { * Create a service plugin descriptor which may have in-AM execution of tasks enabled. Container * execution is enabled by default * + * Note on Uber mode: This is NOT fully supported at the moment. Tasks will be launched within the + * AM process itself, controlled by {@link TezConfiguration#TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS}. + * The AM will need to be sized correctly for the tasks. Memory allocation to the running task + * cannot be controlled yet, and is the full AM heap for each task. + * TODO: TEZ-2722 + * * @param enableUber whether to enable execution in the AM or not * @return a {@link ServicePluginsDescriptor} instance */ http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 7d2e768..f189b84 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -443,7 +443,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements } @VisibleForTesting - protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { + protected void instantiateSchedulers(String host, int port, String trackingUrl, + AppContext appContext) { // Iterate over the list and create all the taskSchedulers int j = 0; for (int i = 0; i < taskSchedulerDescriptors.length; i++) { @@ -472,7 +473,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements // always try to connect to AM and proxy the response. hence it wont work if the webUIService // is not enabled. String trackingUrl = (webUI != null) ? webUI.getTrackingURL() : ""; - instantiateScheduelrs(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext); + instantiateSchedulers(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext); for (int i = 0 ; i < taskSchedulers.length ; i++) { taskSchedulerServiceWrappers[i].init(getConfig()); http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java index 4b2d528..8f5034e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.records.TezTaskAttemptID; http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index fe3e4ef..b09eb86 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -56,8 +58,6 @@ import org.apache.tez.client.TezApiVersionInfo; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.api.TaskHeartbeatRequest; -import org.apache.tez.dag.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.launcher.ContainerLauncherRouter; @@ -325,11 +325,11 @@ public class MockDAGAppMaster extends DAGAppMaster { } } - private void doHeartbeat(TaskHeartbeatRequest request, ContainerData cData) throws Exception { + private void doHeartbeat(TezHeartbeatRequest request, ContainerData cData) throws Exception { long startTime = System.nanoTime(); long startCpuTime = threadMxBean.getCurrentThreadCpuTime(); - TaskHeartbeatResponse response = taListener.heartbeat(request); - if (response.isShouldDie()) { + TezHeartbeatResponse response = taskCommunicator.getUmbilical().heartbeat(request); + if (response.shouldDie()) { cData.remove(); } else { cData.nextFromEventId = response.getNextFromEventId(); @@ -417,9 +417,8 @@ public class MockDAGAppMaster extends DAGAppMaster { events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData( EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId), MockDAGAppMaster.this.getContext().getClock().getTime())); - TaskHeartbeatRequest request = - new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, cData.nextFromEventId, cData.nextPreRoutedFromEventId, - 50000); + TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events, + cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000); doHeartbeat(request, cData); } else if (version != null && cData.taId.getId() <= version.intValue()) { preemptContainer(cData); @@ -430,9 +429,8 @@ public class MockDAGAppMaster extends DAGAppMaster { new TaskAttemptCompletedEvent(), new EventMetaData( EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId), MockDAGAppMaster.this.getContext().getClock().getTime())); - TaskHeartbeatRequest request = - new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, cData.nextFromEventId, cData.nextPreRoutedFromEventId, - 10000); + TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events, + cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000); doHeartbeat(request, cData); cData.clear(); } http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index 1550085..c85be6c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -117,7 +117,7 @@ public class TestTaskSchedulerEventHandler { } @Override - protected void instantiateScheduelrs(String host, int port, String trackingUrl, + protected void instantiateSchedulers(String host, int port, String trackingUrl, AppContext appContext) { taskSchedulers[0] = mockTaskScheduler; taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]); http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 0746507..c13ca5a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -147,7 +147,8 @@ class TestTaskSchedulerHelpers { } @Override - public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { + public void instantiateSchedulers(String host, int port, String trackingUrl, + AppContext appContext) { TaskSchedulerContext taskSchedulerContext = new TaskSchedulerContextImpl(this, appContext, 0, trackingUrl, 1000, host, port, defaultPayload); @@ -166,7 +167,7 @@ class TestTaskSchedulerHelpers { @Override public void serviceStart() { - instantiateScheduelrs("host", 0, "", appContext); + instantiateSchedulers("host", 0, "", appContext); // Init the service so that reuse configuration is picked up. ((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig()); ((AbstractService)taskSchedulerServiceWrappers[0]).start(); http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java deleted file mode 100644 index 294add6..0000000 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.shufflehandler; - -import java.io.FileDescriptor; -import java.io.IOException; -import java.io.RandomAccessFile; - -import org.apache.hadoop.io.ReadaheadPool; -import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.jboss.netty.handler.stream.ChunkedFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FadvisedChunkedFile extends ChunkedFile { - - private static final Logger LOG = LoggerFactory.getLogger(FadvisedChunkedFile.class); - - private final boolean manageOsCache; - private final int readaheadLength; - private final ReadaheadPool readaheadPool; - private final FileDescriptor fd; - private final String identifier; - - private ReadaheadRequest readaheadRequest; - - public FadvisedChunkedFile(RandomAccessFile file, long position, long count, - int chunkSize, boolean manageOsCache, int readaheadLength, - ReadaheadPool readaheadPool, String identifier) throws IOException { - super(file, position, count, chunkSize); - this.manageOsCache = manageOsCache; - this.readaheadLength = readaheadLength; - this.readaheadPool = readaheadPool; - this.fd = file.getFD(); - this.identifier = identifier; - } - - @Override - public Object nextChunk() throws Exception { - if (manageOsCache && readaheadPool != null) { - readaheadRequest = readaheadPool - .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength, - getEndOffset(), readaheadRequest); - } - return super.nextChunk(); - } - - @Override - public void close() throws Exception { - if (readaheadRequest != null) { - readaheadRequest.cancel(); - } - if (manageOsCache && getEndOffset() - getStartOffset() > 0) { - try { - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, - fd, - getStartOffset(), getEndOffset() - getStartOffset(), - NativeIO.POSIX.POSIX_FADV_DONTNEED); - } catch (Throwable t) { - LOG.warn("Failed to manage OS cache for " + identifier, t); - } - } - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java deleted file mode 100644 index e5392d3..0000000 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.shufflehandler; - -import java.io.FileDescriptor; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.WritableByteChannel; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.io.ReadaheadPool; -import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.jboss.netty.channel.DefaultFileRegion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FadvisedFileRegion extends DefaultFileRegion { - - private static final Logger LOG = LoggerFactory.getLogger(FadvisedFileRegion.class); - - private final boolean manageOsCache; - private final int readaheadLength; - private final ReadaheadPool readaheadPool; - private final FileDescriptor fd; - private final String identifier; - private final long count; - private final long position; - private final int shuffleBufferSize; - private final boolean shuffleTransferToAllowed; - private final FileChannel fileChannel; - - private ReadaheadRequest readaheadRequest; - - public FadvisedFileRegion(RandomAccessFile file, long position, long count, - boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, - String identifier, int shuffleBufferSize, - boolean shuffleTransferToAllowed) throws IOException { - super(file.getChannel(), position, count); - this.manageOsCache = manageOsCache; - this.readaheadLength = readaheadLength; - this.readaheadPool = readaheadPool; - this.fd = file.getFD(); - this.identifier = identifier; - this.fileChannel = file.getChannel(); - this.count = count; - this.position = position; - this.shuffleBufferSize = shuffleBufferSize; - this.shuffleTransferToAllowed = shuffleTransferToAllowed; - } - - @Override - public long transferTo(WritableByteChannel target, long position) - throws IOException { - if (manageOsCache && readaheadPool != null) { - readaheadRequest = readaheadPool.readaheadStream(identifier, fd, - getPosition() + position, readaheadLength, - getPosition() + getCount(), readaheadRequest); - } - - if(this.shuffleTransferToAllowed) { - return super.transferTo(target, position); - } else { - return customShuffleTransfer(target, position); - } - } - - /** - * This method transfers data using local buffer. It transfers data from - * a disk to a local buffer in memory, and then it transfers data from the - * buffer to the target. This is used only if transferTo is disallowed in - * the configuration file. super.TransferTo does not perform well on Windows - * due to a small IO request generated. customShuffleTransfer can control - * the size of the IO requests by changing the size of the intermediate - * buffer. - */ - @VisibleForTesting - long customShuffleTransfer(WritableByteChannel target, long position) - throws IOException { - long actualCount = this.count - position; - if (actualCount < 0 || position < 0) { - throw new IllegalArgumentException( - "position out of range: " + position + - " (expected: 0 - " + (this.count - 1) + ')'); - } - if (actualCount == 0) { - return 0L; - } - - long trans = actualCount; - int readSize; - ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize); - - while(trans > 0L && - (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) { - //adjust counters and buffer limit - if(readSize < trans) { - trans -= readSize; - position += readSize; - byteBuffer.flip(); - } else { - //We can read more than we need if the actualCount is not multiple - //of the byteBuffer size and file is big enough. In that case we cannot - //use flip method but we need to set buffer limit manually to trans. - byteBuffer.limit((int)trans); - byteBuffer.position(0); - position += trans; - trans = 0; - } - - //write data to the target - while(byteBuffer.hasRemaining()) { - target.write(byteBuffer); - } - - byteBuffer.clear(); - } - - return actualCount - trans; - } - - - @Override - public void releaseExternalResources() { - if (readaheadRequest != null) { - readaheadRequest.cancel(); - } - super.releaseExternalResources(); - } - - /** - * Call when the transfer completes successfully so we can advise the OS that - * we don't need the region to be cached anymore. - */ - public void transferSuccessful() { - if (manageOsCache && getCount() > 0) { - try { - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, - fd, getPosition(), getCount(), - NativeIO.POSIX.POSIX_FADV_DONTNEED); - } catch (Throwable t) { - LOG.warn("Failed to manage OS cache for " + identifier, t); - } - } - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java index 5a45917..e358fcc 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java @@ -1,11 +1,7 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java index 8cbb8c7..046ce18 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.net.InetSocketAddress; import java.net.URL; -import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collections; @@ -54,15 +53,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.SecureIOUtils; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.MutableCounterInt; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.mapred.FadvisedChunkedFile; +import org.apache.hadoop.mapred.FadvisedFileRegion; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Shell; @@ -199,27 +194,6 @@ public class ShuffleHandler { private static final AtomicBoolean initing = new AtomicBoolean(false); private static ShuffleHandler INSTANCE; - @Metrics(about="Shuffle output metrics", context="mapred") - static class ShuffleMetrics implements ChannelFutureListener { - @Metric("Shuffle output in bytes") - MutableCounterLong shuffleOutputBytes; - @Metric("# of failed shuffle outputs") - MutableCounterInt shuffleOutputsFailed; - @Metric("# of succeeeded shuffle outputs") - MutableCounterInt shuffleOutputsOK; - @Metric("# of current shuffle connections") - MutableGaugeInt shuffleConnections; - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - shuffleOutputsOK.incr(); - } else { - shuffleOutputsFailed.incr(); - } - shuffleConnections.decr(); - } - } public ShuffleHandler(Configuration conf) { this.conf = conf; @@ -299,57 +273,11 @@ public class ShuffleHandler { } public static ShuffleHandler get() { - Preconditions.checkState(started.get(), "ShuffleHandler must be started before invoking started"); + Preconditions.checkState(started.get(), + "ShuffleHandler must be started before invoking started"); return INSTANCE; } - /** - * Serialize the shuffle port into a ByteBuffer for use later on. - * @param port the port to be sent to the ApplciationMaster - * @return the serialized form of the port. - */ - public static ByteBuffer serializeMetaData(int port) throws IOException { - //TODO these bytes should be versioned - DataOutputBuffer port_dob = new DataOutputBuffer(); - port_dob.writeInt(port); - return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength()); - } - - /** - * A helper function to deserialize the metadata returned by ShuffleHandler. - * @param meta the metadata returned by the ShuffleHandler - * @return the port the Shuffle Handler is listening on to serve shuffle data. - */ - public static int deserializeMetaData(ByteBuffer meta) throws IOException { - //TODO this should be returning a class not just an int - DataInputByteBuffer in = new DataInputByteBuffer(); - in.reset(meta); - int port = in.readInt(); - return port; - } - - /** - * A helper function to serialize the JobTokenIdentifier to be sent to the - * ShuffleHandler as ServiceData. - * @param jobToken the job token to be used for authentication of - * shuffle data requests. - * @return the serialized version of the jobToken. - */ - public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException { - //TODO these bytes should be versioned - DataOutputBuffer jobToken_dob = new DataOutputBuffer(); - jobToken.write(jobToken_dob); - return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()); - } - - static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException { - DataInputByteBuffer in = new DataInputByteBuffer(); - in.reset(secret); - Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(); - jt.readFields(in); - return jt; - } - public int getPort() { return port; }
