Repository: hadoop Updated Branches: refs/heads/branch-2.7 ac76dc10d -> 9f4585c95
Revert "HDFS-11551. Handle SlowDiskReport from DataNode at the NameNode. Contributed by Xiaobing Zhou." This reverts commit ac76dc10dde45ed00885bb96e89ed78015435790. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f2d1d3d1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f2d1d3d1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f2d1d3d1 Branch: refs/heads/branch-2.7 Commit: f2d1d3d159c81056a93facad750b3738914f24c3 Parents: ac76dc1 Author: Arpit Agarwal <[email protected]> Authored: Tue Apr 11 11:42:57 2017 -0700 Committer: Arpit Agarwal <[email protected]> Committed: Tue Apr 11 11:42:57 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSOutputStream.java | 71 ++--------- .../protocol/datatransfer/PacketReceiver.java | 2 +- .../apache/hadoop/hdfs/TestDFSOutputStream.java | 126 ------------------- 3 files changed, 13 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d1d3d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 5de5460..ef8aa5a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -72,7 +72,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; @@ -102,8 +101,6 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceScope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -140,8 +137,6 @@ import com.google.common.cache.RemovalNotification; public class DFSOutputStream extends FSOutputSummer implements Syncable, CanSetDropBehind { private final long dfsclientSlowLogThresholdMs; - static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class); - /** * Number of times to retry creating a file when there are transient * errors (typically related to encryption zones and KeyProvider operations). @@ -191,7 +186,6 @@ public class DFSOutputStream extends FSOutputSummer private FileEncryptionInfo fileEncryptionInfo; private static final BlockStoragePolicySuite blockStoragePolicySuite = BlockStoragePolicySuite.createDefaultSuite(); - private int writePacketSize; /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, @@ -1675,9 +1669,7 @@ public class DFSOutputStream extends FSOutputSummer DFSClient.LOG.debug( "Set non-null progress callback on DFSOutputStream " + src); } - - initWritePacketSize(); - + this.bytesPerChecksum = checksum.getBytesPerChecksum(); if (bytesPerChecksum <= 0) { throw new HadoopIllegalArgumentException( @@ -1695,21 +1687,6 @@ public class DFSOutputStream extends FSOutputSummer this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); } - /** - * Ensures the configured writePacketSize never exceeds - * PacketReceiver.MAX_PACKET_SIZE. - */ - private void initWritePacketSize() { - writePacketSize = dfsClient.getConf().writePacketSize; - if (writePacketSize > PacketReceiver.MAX_PACKET_SIZE) { - LOG.warn( - "Configured write packet exceeds {} bytes as max," - + " using {} bytes.", - PacketReceiver.MAX_PACKET_SIZE, PacketReceiver.MAX_PACKET_SIZE); - writePacketSize = PacketReceiver.MAX_PACKET_SIZE; - } - } - /** Construct a new output stream for creating a file. */ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, @@ -1959,8 +1936,18 @@ public class DFSOutputStream extends FSOutputSummer } waitAndQueueCurrentPacket(); - adjustChunkBoundary(); + // If the reopened file did not end at chunk boundary and the above + // write filled up its partial chunk. Tell the summer to generate full + // crc chunks from now on. + if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) { + appendChunk = false; + resetChecksumBufSize(); + } + if (!appendChunk) { + int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize); + computePacketChunkSize(psize, bytesPerChecksum); + } // // if encountering a block boundary, send an empty packet to // indicate the end of block and reset bytesCurBlock. @@ -1975,40 +1962,6 @@ public class DFSOutputStream extends FSOutputSummer } } - /** - * If the reopened file did not end at chunk boundary and the above - * write filled up its partial chunk. Tell the summer to generate full - * crc chunks from now on. - */ - protected void adjustChunkBoundary() { - if (appendChunk && bytesCurBlock % bytesPerChecksum == 0) { - appendChunk = false; - resetChecksumBufSize(); - } - - if (!appendChunk) { - final int psize = (int) Math.min(blockSize - bytesCurBlock, - writePacketSize); - computePacketChunkSize(psize, bytesPerChecksum); - } - } - - /** - * Used in test only. - */ - @VisibleForTesting - void setAppendChunk(final boolean appendChunk) { - this.appendChunk = appendChunk; - } - - /** - * Used in test only. - */ - @VisibleForTesting - void setBytesCurBlock(final long bytesCurBlock) { - this.bytesCurBlock = bytesCurBlock; - } - @Deprecated public void sync() throws IOException { hflush(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d1d3d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java index 784c305..3045a13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java @@ -45,7 +45,7 @@ public class PacketReceiver implements Closeable { * The max size of any single packet. This prevents OOMEs when * invalid data is sent. */ - public static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; + private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; static final Log LOG = LogFactory.getLog(PacketReceiver.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d1d3d1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 8c46564..7269e39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -17,28 +17,20 @@ */ package org.apache.hadoop.hdfs; -import java.io.File; import java.io.IOException; import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.PathUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; - public class TestDFSOutputStream { static MiniDFSCluster cluster; @@ -105,124 +97,6 @@ public class TestDFSOutputStream { Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize); } - /** - * This tests preventing overflows of package size and bodySize. - * <p> - * See also https://issues.apache.org/jira/browse/HDFS-11608. - * </p> - * @throws IOException - * @throws SecurityException - * @throws NoSuchFieldException - * @throws InvocationTargetException - * @throws IllegalArgumentException - * @throws IllegalAccessException - * @throws NoSuchMethodException - */ - @Test(timeout=60000) - public void testPreventOverflow() throws IOException, NoSuchFieldException, - SecurityException, IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException { - - final int defaultWritePacketSize = DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; - int configuredWritePacketSize = defaultWritePacketSize; - int finalWritePacketSize = defaultWritePacketSize; - - /* test default WritePacketSize, e.g. 64*1024 */ - runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize); - - /* test large WritePacketSize, e.g. 1G */ - configuredWritePacketSize = 1000 * 1024 * 1024; - finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE; - runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize); - } - - /** - * @configuredWritePacketSize the configured WritePacketSize. - * @finalWritePacketSize the final WritePacketSize picked by - * {@link DFSOutputStream#adjustChunkBoundary} - */ - private void runAdjustChunkBoundary( - final int configuredWritePacketSize, - final int finalWritePacketSize) throws IOException, NoSuchFieldException, - SecurityException, IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException { - - final boolean appendChunk = false; - final long blockSize = 3221225500L; - final long bytesCurBlock = 1073741824L; - final int bytesPerChecksum = 512; - final int checksumSize = 4; - final int chunkSize = bytesPerChecksum + checksumSize; - final int packateMaxHeaderLength = 33; - - MiniDFSCluster dfsCluster = null; - final File baseDir = new File(PathUtils.getTestDir(getClass()), - GenericTestUtils.getMethodName()); - - try { - final Configuration dfsConf = new Configuration(); - dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, - baseDir.getAbsolutePath()); - dfsConf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, - configuredWritePacketSize); - dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build(); - dfsCluster.waitActive(); - - final FSDataOutputStream os = dfsCluster.getFileSystem() - .create(new Path(baseDir.getAbsolutePath(), "testPreventOverflow")); - final DFSOutputStream dos = (DFSOutputStream) Whitebox - .getInternalState(os, "wrappedStream"); - - /* set appendChunk */ - final Method setAppendChunkMethod = dos.getClass() - .getDeclaredMethod("setAppendChunk", boolean.class); - setAppendChunkMethod.setAccessible(true); - setAppendChunkMethod.invoke(dos, appendChunk); - - /* set bytesCurBlock */ - final Method setBytesCurBlockMethod = dos.getClass() - .getDeclaredMethod("setBytesCurBlock", long.class); - setBytesCurBlockMethod.setAccessible(true); - setBytesCurBlockMethod.invoke(dos, bytesCurBlock); - - /* set blockSize */ - final Field blockSizeField = dos.getClass().getDeclaredField("blockSize"); - blockSizeField.setAccessible(true); - blockSizeField.setLong(dos, blockSize); - - /* call adjustChunkBoundary */ - final Method method = dos.getClass() - .getDeclaredMethod("adjustChunkBoundary"); - method.setAccessible(true); - method.invoke(dos); - - /* get and verify writePacketSize */ - final Field writePacketSizeField = dos.getClass() - .getDeclaredField("writePacketSize"); - writePacketSizeField.setAccessible(true); - Assert.assertEquals(writePacketSizeField.getInt(dos), - finalWritePacketSize); - - /* get and verify chunksPerPacket */ - final Field chunksPerPacketField = dos.getClass() - .getDeclaredField("chunksPerPacket"); - chunksPerPacketField.setAccessible(true); - Assert.assertEquals(chunksPerPacketField.getInt(dos), - (finalWritePacketSize - packateMaxHeaderLength) / chunkSize); - - /* get and verify packetSize */ - final Field packetSizeField = dos.getClass() - .getDeclaredField("packetSize"); - packetSizeField.setAccessible(true); - Assert.assertEquals(packetSizeField.getInt(dos), - chunksPerPacketField.getInt(dos) * chunkSize); - } finally { - if (dfsCluster != null) { - dfsCluster.shutdown(); - } - } - } - @AfterClass public static void tearDown() { cluster.shutdown(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
