Author: arp Date: Tue Nov 19 17:26:23 2013 New Revision: 1543510 URL: http://svn.apache.org/r1543510 Log: Merging r1543111 through r1543509 from trunk to branch HDFS-2832
Added: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectCompressor.java - copied unchanged from r1543509, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectCompressor.java hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java - copied unchanged from r1543509, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-auth/pom.xml hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/ (props changed) hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-auth/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-auth/pom.xml?rev=1543510&r1=1543509&r2=1543510&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-auth/pom.xml (original) +++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-auth/pom.xml Tue Nov 19 17:26:23 2013 @@ -54,6 +54,11 @@ </dependency> <dependency> <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mortbay.jetty</groupId> <artifactId>jetty</artifactId> <scope>test</scope> </dependency> Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1543510&r1=1543509&r2=1543510&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt Tue Nov 19 17:26:23 2013 @@ -387,6 +387,9 @@ Release 2.3.0 - UNRELEASED HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn) + HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V via + acmurthy) + BUG FIXES HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize @@ -439,6 +442,9 @@ Release 2.3.0 - UNRELEASED HADOOP-10100. MiniKDC shouldn't use apacheds-all artifact. (rkanter via tucu) + HADOOP-10107. Server.getNumOpenConnections may throw NPE. (Kihwal Lee via + jing9) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES @@ -491,6 +497,9 @@ Release 2.2.1 - UNRELEASED HADOOP-10078. KerberosAuthenticator always does SPNEGO. (rkanter via tucu) + HADOOP-10110. hadoop-auth has a build break due to missing dependency. + (Chuan Liu via arp) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1543111-1543509 Propchange: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1543111-1543509 Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?rev=1543510&r1=1543509&r2=1543510&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java Tue Nov 19 17:26:23 2013 @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.DirectCompressor; import org.apache.hadoop.util.NativeCodeLoader; import org.apache.commons.logging.Log; @@ -35,7 +36,7 @@ import org.apache.commons.logging.LogFac * http://www.zlib.net/ * */ -public class ZlibCompressor implements Compressor { +public class ZlibCompressor implements Compressor,DirectCompressor { private static final Log LOG = LogFactory.getLog(ZlibCompressor.class); @@ -420,6 +421,7 @@ public class ZlibCompressor implements C compressedDirectBuf.limit(directBufferSize); compressedDirectBuf.position(directBufferSize); userBufOff = userBufLen = 0; + userBuf = null; } @Override @@ -435,6 +437,110 @@ public class ZlibCompressor implements C throw new NullPointerException(); } + private int put(ByteBuffer dst, ByteBuffer src) { + // this will lop off data from src[pos:limit] into dst[pos:limit] + int l1 = src.remaining(); + int l2 = dst.remaining(); + int pos1 = src.position(); + int pos2 = dst.position(); + int len = Math.min(l1, l2); + + if (len == 0) { + return 0; + } + + ByteBuffer slice = src.slice(); + slice.limit(len); + dst.put(slice); + src.position(pos1 + len); + return len; + } + + public int compress(ByteBuffer dst, ByteBuffer src) throws IOException { + assert dst.remaining() > 0 : "dst.remaining() == 0"; + int n = 0; + + /* fast path for clean state and direct buffers */ + /* TODO: reset should free userBuf? */ + if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) { + /* + * TODO: fix these assumptions in inflateDirect(), eventually by allowing + * it to read position()/limit() directly + */ + boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.capacity() >= directBufferSize); + boolean cleanState = (keepUncompressedBuf == false && uncompressedDirectBufLen == 0 && compressedDirectBuf.remaining() == 0); + /* use the buffers directly */ + if(cleanDst && cleanState) { + Buffer originalCompressed = compressedDirectBuf; + Buffer originalUncompressed = uncompressedDirectBuf; + int originalBufferSize = directBufferSize; + uncompressedDirectBuf = src; + uncompressedDirectBufOff = src.position(); + uncompressedDirectBufLen = src.remaining(); + compressedDirectBuf = dst; + directBufferSize = dst.remaining(); + // Compress data + n = deflateBytesDirect(); + // we move dst.position() forward, not limit() + // unlike the local buffer case, which moves it when we put() into the dst + dst.position(n); + if(uncompressedDirectBufLen > 0) { + src.position(uncompressedDirectBufOff); + } else { + src.position(src.limit()); + } + compressedDirectBuf = originalCompressed; + uncompressedDirectBuf = originalUncompressed; + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + directBufferSize = originalBufferSize; + return n; + } + } + + // Check if there is compressed data + if (compressedDirectBuf.remaining() > 0) { + n = put(dst, (ByteBuffer) compressedDirectBuf); + } + + if (dst.remaining() == 0) { + return n; + } else { + needsInput(); + + // if we have drained userBuf, read from src (ideally, do not mix buffer + // modes, but sometimes you can) + if (userBufLen == 0 && src != null && src.remaining() > 0) { + put((ByteBuffer) uncompressedDirectBuf, src); + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + } + + // Re-initialize the zlib's output direct buffer + compressedDirectBuf.rewind(); + compressedDirectBuf.limit(directBufferSize); + + // Compress data + int more = deflateBytesDirect(); + + compressedDirectBuf.limit(more); + + // Check if zlib consumed all input buffer + // set keepUncompressedBuf properly + if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer + keepUncompressedBuf = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + } else { // zlib did not consume all input buffer + keepUncompressedBuf = true; + } + + // fill the dst buffer from compressedDirectBuf + int fill = put(dst, ((ByteBuffer) compressedDirectBuf)); + return n + fill; + } + } + private native static void initIDs(); private native static long init(int level, int strategy, int windowBits); private native static void setDictionary(long strm, byte[] b, int off, Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?rev=1543510&r1=1543509&r2=1543510&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Tue Nov 19 17:26:23 2013 @@ -23,6 +23,7 @@ import java.nio.Buffer; import java.nio.ByteBuffer; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; import org.apache.hadoop.util.NativeCodeLoader; /** @@ -31,7 +32,7 @@ import org.apache.hadoop.util.NativeCode * http://www.zlib.net/ * */ -public class ZlibDecompressor implements Decompressor { +public class ZlibDecompressor implements Decompressor,DirectDecompressor { private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; // HACK - Use this as a global lock in the JNI layer @@ -280,6 +281,7 @@ public class ZlibDecompressor implements uncompressedDirectBuf.limit(directBufferSize); uncompressedDirectBuf.position(directBufferSize); userBufOff = userBufLen = 0; + userBuf = null; } @Override @@ -299,6 +301,108 @@ public class ZlibDecompressor implements if (stream == 0) throw new NullPointerException(); } + + private int put(ByteBuffer dst, ByteBuffer src) { + // this will lop off data from src[pos:limit] into dst[pos:limit], using the + // min() of both remaining() + int l1 = src.remaining(); + int l2 = dst.remaining(); + int pos1 = src.position(); + int pos2 = dst.position(); + int len = Math.min(l1, l2); + + if (len == 0) { + return 0; + } + + ByteBuffer slice = src.slice(); + slice.limit(len); + dst.put(slice); + src.position(pos1 + len); + return len; + } + + public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException { + assert dst.remaining() > 0 : "dst.remaining == 0"; + int n = 0; + + /* fast path for clean state and direct buffers */ + if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) { + /* + * TODO: fix these assumptions in inflateDirect(), eventually by allowing + * it to read position()/limit() directly + */ + boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.remaining() >= directBufferSize); + boolean cleanState = (compressedDirectBufLen == 0 && uncompressedDirectBuf.remaining() == 0); + /* use the buffers directly */ + if(cleanDst && cleanState) { + Buffer originalCompressed = compressedDirectBuf; + Buffer originalUncompressed = uncompressedDirectBuf; + int originalBufferSize = directBufferSize; + compressedDirectBuf = src; + compressedDirectBufOff = src.position(); + compressedDirectBufLen = src.remaining(); + uncompressedDirectBuf = dst; + directBufferSize = dst.remaining(); + // Compress data + n = inflateBytesDirect(); + dst.position(n); + if(compressedDirectBufLen > 0) { + src.position(compressedDirectBufOff); + } else { + src.position(src.limit()); + } + compressedDirectBuf = originalCompressed; + uncompressedDirectBuf = originalUncompressed; + compressedDirectBufOff = 0; + compressedDirectBufLen = 0; + directBufferSize = originalBufferSize; + return n; + } + } + + // Check if there is compressed data + if (uncompressedDirectBuf.remaining() > 0) { + n = put(dst, (ByteBuffer) uncompressedDirectBuf); + } + + if (dst.remaining() == 0) { + return n; + } else { + if (needsInput()) { + // this does not update buffers if we have no userBuf + if (userBufLen <= 0) { + compressedDirectBufOff = 0; + compressedDirectBufLen = 0; + compressedDirectBuf.rewind().limit(directBufferSize); + } + if (src != null) { + assert src.remaining() > 0 : "src.remaining() == 0"; + } + } + + // if we have drained userBuf, read from src (ideally, do not mix buffer + // modes, but sometimes you can) + if (userBufLen == 0 && src != null && src.remaining() > 0) { + compressedDirectBufLen += put(((ByteBuffer) compressedDirectBuf), src); + } + + // Re-initialize the zlib's output direct buffer + uncompressedDirectBuf.rewind(); + uncompressedDirectBuf.limit(directBufferSize); + + // Compress data + int more = inflateBytesDirect(); + + uncompressedDirectBuf.limit(more); + + // Get atmost 'len' bytes + int fill = put(dst, ((ByteBuffer) uncompressedDirectBuf)); + return n + fill; + } + } + + private native static void initIDs(); private native static long init(int windowBits); Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1543510&r1=1543509&r2=1543510&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Tue Nov 19 17:26:23 2013 @@ -2109,6 +2109,7 @@ public abstract class Server { // Start the listener here and let it bind to the port listener = new Listener(); this.port = listener.getAddress().getPort(); + connectionManager = new ConnectionManager(); this.rpcMetrics = RpcMetrics.create(this); this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port); this.tcpNoDelay = conf.getBoolean( @@ -2117,7 +2118,6 @@ public abstract class Server { // Create the responder here responder = new Responder(); - connectionManager = new ConnectionManager(); if (secretManager != null) { SaslRpcServer.init(conf); Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java?rev=1543510&r1=1543509&r2=1543510&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java Tue Nov 19 17:26:23 2013 @@ -19,8 +19,13 @@ package org.apache.hadoop.io.compress.zl import static org.junit.Assert.*; import static org.junit.Assume.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Console; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.Random; import org.apache.hadoop.conf.Configuration; @@ -33,8 +38,12 @@ import org.apache.hadoop.io.compress.Dec import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; +import org.apache.log4j.ConsoleAppender; import org.junit.Before; import org.junit.Test; + +import sun.util.logging.resources.logging; + import com.google.common.collect.ImmutableSet; public class TestZlibCompressorDecompressor { @@ -150,6 +159,149 @@ public class TestZlibCompressorDecompres } } + private void compressDecompressLoop(int rawDataSize, int inSize, int outSize) + throws IOException { + byte[] rawData = null; + rawData = generate(rawDataSize); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteBuffer inBuf = ByteBuffer.allocateDirect(inSize); + ByteBuffer outBuf = ByteBuffer.allocateDirect(outSize); + ZlibCompressor compressor = new ZlibCompressor(); + ZlibDecompressor decompressor = new ZlibDecompressor(); + outBuf.clear(); + /* compression loop */ + int off = 0; + int len = rawDataSize; + int min = Math.min(inBuf.remaining(), len); + if (min > 0) { + inBuf.put(rawData, off, min); + } + inBuf.flip(); + len -= min; + off += min; + while (!compressor.finished()) { + compressor.compress(outBuf, inBuf); + if (outBuf.remaining() == 0) { + // flush when the buffer is full + outBuf.flip(); + while (outBuf.remaining() > 0) { + baos.write(outBuf.get()); + } + outBuf.clear(); + } + if (inBuf != null && inBuf.remaining() == 0) { + inBuf.clear(); + if (len > 0) { + min = Math.min(inBuf.remaining(), len); + inBuf.put(rawData, off, min); + inBuf.flip(); + len -= min; + off += min; + } else { + inBuf = null; + compressor.finish(); + } + } + } + + outBuf.flip(); + if (outBuf.remaining() > 0) { + while (outBuf.remaining() > 0) { + baos.write(outBuf.get()); + } + outBuf.clear(); + } + + compressor.end(); + + byte[] compressed = baos.toByteArray(); + ByteBuffer expected = ByteBuffer.wrap(rawData); + outBuf.clear(); + inBuf = ByteBuffer.allocateDirect(inSize); + inBuf.clear(); + + // zlib always has header + if (compressed.length != 0) { + off = 0; + len = compressed.length; + min = Math.min(inBuf.remaining(), len); + inBuf.put(compressed, off, min); + inBuf.flip(); + len -= min; + off += min; + while (!decompressor.finished()) { + decompressor.decompress(outBuf, inBuf); + if (outBuf.remaining() == 0) { + outBuf.flip(); + while (outBuf.remaining() > 0) { + assertEquals(expected.get(), outBuf.get()); + } + outBuf.clear(); + } + + if (inBuf != null && inBuf.remaining() == 0) { + inBuf.clear(); + if (len > 0) { + min = Math.min(inBuf.remaining(), len); + inBuf.put(compressed, off, min); + inBuf.flip(); + len -= min; + off += min; + } + } + } + } + + outBuf.flip(); + if (outBuf.remaining() > 0) { + while (outBuf.remaining() > 0) { + assertEquals(expected.get(), outBuf.get()); + } + outBuf.clear(); + } + + assertEquals(0, expected.remaining()); + } + + @Test + public void testZlibDirectCompressDecompress() { + int[] size = { 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 256 * 1024, + 1024 * 1024 }; + try { + // 0-2 bytes results in sizeof(outBuf) > sizeof(inBuf) + compressDecompressLoop(0, 4096, 4096); + compressDecompressLoop(0, 1, 1); + compressDecompressLoop(1, 1, 2); + compressDecompressLoop(1, 2, 1); + compressDecompressLoop(2, 3, 2); + + for (int i = 0; i < size.length; i++) { + compressDecompressLoop(size[i], 4096, 4096); + compressDecompressLoop(size[i], 1, 1); + compressDecompressLoop(size[i], 1, 2); + compressDecompressLoop(size[i], 2, 1); + compressDecompressLoop(size[i], 3, 2); + compressDecompressLoop(size[i], size[i], 4096); + compressDecompressLoop(size[i], size[i] - 1, 4096); + compressDecompressLoop(size[i], size[i] + 1, 4096); + compressDecompressLoop(size[i], 4096, size[i]); + compressDecompressLoop(size[i], 4096, size[i] - 1); + compressDecompressLoop(size[i], 4096, size[i] + 1); + compressDecompressLoop(size[i], size[i] - 1, size[i] - 1); + + compressDecompressLoop(size[i], size[i] / 2, 4096); + compressDecompressLoop(size[i], size[i] / 2 - 1, 4096); + compressDecompressLoop(size[i], size[i] / 2 + 1, 4096); + compressDecompressLoop(size[i], 4096, size[i] / 2); + compressDecompressLoop(size[i], 4096, size[i] / 2 - 1); + compressDecompressLoop(size[i], 4096, size[i] / 2 + 1); + compressDecompressLoop(size[i], size[i] / 2 - 1, size[i] / 2 - 1); + } + } catch (IOException ex) { + fail("testZlibDirectCompressDecompress ex !!!" + ex); + } + } + @Test public void testZlibCompressorDecompressorSetDictionary() { Configuration conf = new Configuration();