Repository: tez Updated Branches: refs/heads/TEZ-3334 738f9a614 -> e272e370a
TEZ-3377. Remove ShuffleHandler dependency on mapred.FadvisedChunkedFile and mapred.FadvisedFileRegion (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e272e370 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e272e370 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e272e370 Branch: refs/heads/TEZ-3334 Commit: e272e370ad4a8fbcb0b567de63651b5445d991b8 Parents: 738f9a6 Author: Jonathan Eagles <[email protected]> Authored: Wed Jul 27 11:08:51 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Wed Jul 27 11:08:51 2016 -0500 ---------------------------------------------------------------------- .../tez/auxservices/FadvisedChunkedFile.java | 85 ++++++++++ .../tez/auxservices/FadvisedFileRegion.java | 167 +++++++++++++++++++ .../apache/tez/auxservices/ShuffleHandler.java | 2 - 3 files changed, 252 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e272e370/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java new file mode 100644 index 0000000..e14b79d --- /dev/null +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.auxservices; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; +import org.apache.hadoop.io.nativeio.NativeIO; + +import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED; + +import org.jboss.netty.handler.stream.ChunkedFile; + +public class FadvisedChunkedFile extends ChunkedFile { + + private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class); + + private final boolean manageOsCache; + private final int readaheadLength; + private final ReadaheadPool readaheadPool; + private final FileDescriptor fd; + private final String identifier; + + private ReadaheadRequest readaheadRequest; + + public FadvisedChunkedFile(RandomAccessFile file, long position, long count, + int chunkSize, boolean manageOsCache, int readaheadLength, + ReadaheadPool readaheadPool, String identifier) throws IOException { + super(file, position, count, chunkSize); + this.manageOsCache = manageOsCache; + this.readaheadLength = readaheadLength; + this.readaheadPool = readaheadPool; + this.fd = file.getFD(); + this.identifier = identifier; + } + + @Override + public Object nextChunk() throws Exception { + if (manageOsCache && readaheadPool != null) { + readaheadRequest = readaheadPool + .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength, + getEndOffset(), readaheadRequest); + } + return super.nextChunk(); + } + + @Override + public void close() throws Exception { + if (readaheadRequest != null) { + readaheadRequest.cancel(); + } + if (manageOsCache && getEndOffset() - getStartOffset() > 0) { + try { + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, + fd, + getStartOffset(), getEndOffset() - getStartOffset(), + POSIX_FADV_DONTNEED); + } catch (Throwable t) { + LOG.warn("Failed to manage OS cache for " + identifier, t); + } + } + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e272e370/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java new file mode 100644 index 0000000..0e62345 --- /dev/null +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.auxservices; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; +import org.apache.hadoop.io.nativeio.NativeIO; + +import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED; + +import org.jboss.netty.channel.DefaultFileRegion; + +import com.google.common.annotations.VisibleForTesting; + +public class FadvisedFileRegion extends DefaultFileRegion { + + private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class); + + private final boolean manageOsCache; + private final int readaheadLength; + private final ReadaheadPool readaheadPool; + private final FileDescriptor fd; + private final String identifier; + private final long count; + private final long position; + private final int shuffleBufferSize; + private final boolean shuffleTransferToAllowed; + private final FileChannel fileChannel; + + private ReadaheadRequest readaheadRequest; + + public FadvisedFileRegion(RandomAccessFile file, long position, long count, + boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, + String identifier, int shuffleBufferSize, + boolean shuffleTransferToAllowed) throws IOException { + super(file.getChannel(), position, count); + this.manageOsCache = manageOsCache; + this.readaheadLength = readaheadLength; + this.readaheadPool = readaheadPool; + this.fd = file.getFD(); + this.identifier = identifier; + this.fileChannel = file.getChannel(); + this.count = count; + this.position = position; + this.shuffleBufferSize = shuffleBufferSize; + this.shuffleTransferToAllowed = shuffleTransferToAllowed; + } + + @Override + public long transferTo(WritableByteChannel target, long position) + throws IOException { + if (readaheadPool != null && readaheadLength > 0) { + readaheadRequest = readaheadPool.readaheadStream(identifier, fd, + getPosition() + position, readaheadLength, + getPosition() + getCount(), readaheadRequest); + } + + if(this.shuffleTransferToAllowed) { + return super.transferTo(target, position); + } else { + return customShuffleTransfer(target, position); + } + } + + /** + * This method transfers data using local buffer. It transfers data from + * a disk to a local buffer in memory, and then it transfers data from the + * buffer to the target. This is used only if transferTo is disallowed in + * the configuration file. super.TransferTo does not perform well on Windows + * due to a small IO request generated. customShuffleTransfer can control + * the size of the IO requests by changing the size of the intermediate + * buffer. + */ + @VisibleForTesting + long customShuffleTransfer(WritableByteChannel target, long position) + throws IOException { + long actualCount = this.count - position; + if (actualCount < 0 || position < 0) { + throw new IllegalArgumentException( + "position out of range: " + position + + " (expected: 0 - " + (this.count - 1) + ')'); + } + if (actualCount == 0) { + return 0L; + } + + long trans = actualCount; + int readSize; + ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize); + + while(trans > 0L && + (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) { + //adjust counters and buffer limit + if(readSize < trans) { + trans -= readSize; + position += readSize; + byteBuffer.flip(); + } else { + //We can read more than we need if the actualCount is not multiple + //of the byteBuffer size and file is big enough. In that case we cannot + //use flip method but we need to set buffer limit manually to trans. + byteBuffer.limit((int)trans); + byteBuffer.position(0); + position += trans; + trans = 0; + } + + //write data to the target + while(byteBuffer.hasRemaining()) { + target.write(byteBuffer); + } + + byteBuffer.clear(); + } + + return actualCount - trans; + } + + + @Override + public void releaseExternalResources() { + if (readaheadRequest != null) { + readaheadRequest.cancel(); + } + super.releaseExternalResources(); + } + + /** + * Call when the transfer completes successfully so we can advise the OS that + * we don't need the region to be cached anymore. + */ + public void transferSuccessful() { + if (manageOsCache && getCount() > 0) { + try { + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, + fd, getPosition(), getCount(), POSIX_FADV_DONTNEED); + } catch (Throwable t) { + LOG.warn("Failed to manage OS cache for " + identifier, t); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e272e370/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index e5b4c79..cccd1a0 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -65,8 +65,6 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FadvisedChunkedFile; -import org.apache.hadoop.mapred.FadvisedFileRegion; import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto; import org.apache.hadoop.mapreduce.JobID; import org.apache.tez.mapreduce.hadoop.MRConfig;
