Author: eli
Date: Mon Nov 22 21:54:58 2010
New Revision: 1037903
URL: http://svn.apache.org/viewvc?rev=1037903&view=rev
Log:
HADOOP-6683. svn merge -c 1037901 from trunk
Modified:
hadoop/common/branches/branch-0.22/ (props changed)
hadoop/common/branches/branch-0.22/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-0.22/src/contrib/ec2/ (props changed)
hadoop/common/branches/branch-0.22/src/docs/ (props changed)
hadoop/common/branches/branch-0.22/src/java/ (props changed)
hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
hadoop/common/branches/branch-0.22/src/test/core/ (props changed)
hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/io/TestSequenceFile.java
(props changed)
Propchange: hadoop/common/branches/branch-0.22/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 22 21:54:58 2010
@@ -1 +1,2 @@
+/hadoop/common/trunk:1037901
/hadoop/core/branches/branch-0.19/core:713112
Modified: hadoop/common/branches/branch-0.22/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/CHANGES.txt?rev=1037903&r1=1037902&r2=1037903&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.22/CHANGES.txt Mon Nov 22 21:54:58 2010
@@ -190,6 +190,9 @@ Release 0.22.0 - Unreleased
HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..).
(Erik Steffl via szetszwo)
+ HADOOP-6683. ZlibCompressor does not fully utilize the buffer.
+ (Kang Xiao via eli)
+
BUG FIXES
HADOOP-6638. try to relogin in a case of failed RPC connection (expired
Propchange: hadoop/common/branches/branch-0.22/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 22 21:54:58 2010
@@ -1,3 +1,4 @@
+/hadoop/common/trunk/CHANGES.txt:1037901
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
/hadoop/core/trunk/CHANGES.txt:776175-785643,785929-786278
Propchange: hadoop/common/branches/branch-0.22/src/contrib/ec2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 22 21:54:58 2010
@@ -1,2 +1,3 @@
+/hadoop/common/trunk/src/contrib/ec2:1037901
/hadoop/core/branches/branch-0.19/core/src/contrib/ec2:713112
/hadoop/core/trunk/src/contrib/ec2:776175-784663
Propchange: hadoop/common/branches/branch-0.22/src/docs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 22 21:54:58 2010
@@ -1 +1,2 @@
+/hadoop/common/trunk/src/docs:1037901
/hadoop/core/branches/branch-0.19/src/docs:713112
Propchange: hadoop/common/branches/branch-0.22/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 22 21:54:58 2010
@@ -1,2 +1,3 @@
+/hadoop/common/trunk/src/java:1037901
/hadoop/core/branches/branch-0.19/core/src/java:713112
/hadoop/core/trunk/src/core:776175-785643,785929-786278
Modified:
hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?rev=1037903&r1=1037902&r2=1037903&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
(original)
+++
hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
Mon Nov 22 21:54:58 2010
@@ -53,6 +53,7 @@ public class ZlibCompressor implements C
private int userBufOff = 0, userBufLen = 0;
private Buffer uncompressedDirectBuf = null;
private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
+ private boolean keepUncompressedBuf = false;
private Buffer compressedDirectBuf = null;
private boolean finish, finished;
@@ -269,6 +270,7 @@ public class ZlibCompressor implements C
this.userBuf = b;
this.userBufOff = off;
this.userBufLen = len;
+ uncompressedDirectBufOff = 0;
setInputFromSavedData();
// Reinitialize zlib's output direct buffer
@@ -276,21 +278,13 @@ public class ZlibCompressor implements C
compressedDirectBuf.position(directBufferSize);
}
+ //copy enough data from userBuf to uncompressedDirectBuf
synchronized void setInputFromSavedData() {
- uncompressedDirectBufOff = 0;
- uncompressedDirectBufLen = userBufLen;
- if (uncompressedDirectBufLen > directBufferSize) {
- uncompressedDirectBufLen = directBufferSize;
- }
-
- // Reinitialize zlib's input direct buffer
- uncompressedDirectBuf.rewind();
- ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff,
- uncompressedDirectBufLen);
-
- // Note how much data is being fed to zlib
- userBufOff += uncompressedDirectBufLen;
- userBufLen -= uncompressedDirectBufLen;
+ int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
+ ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len);
+ userBufLen -= len;
+ userBufOff += len;
+ uncompressedDirectBufLen = uncompressedDirectBuf.position();
}
public synchronized void setDictionary(byte[] b, int off, int len) {
@@ -310,12 +304,21 @@ public class ZlibCompressor implements C
}
// Check if zlib has consumed all input
- if (uncompressedDirectBufLen <= 0) {
+ // compress should be invoked if keepUncompressedBuf true
+ if (keepUncompressedBuf && uncompressedDirectBufLen > 0)
+ return false;
+
+ if (uncompressedDirectBuf.remaining() > 0) {
// Check if we have consumed all user-input
if (userBufLen <= 0) {
return true;
} else {
+ // copy enough data from userBuf to uncompressedDirectBuf
setInputFromSavedData();
+ if (uncompressedDirectBuf.remaining() > 0) // uncompressedDirectBuf is
not full
+ return true;
+ else
+ return false;
}
}
@@ -359,6 +362,17 @@ public class ZlibCompressor implements C
n = deflateBytesDirect();
compressedDirectBuf.limit(n);
+ // Check if zlib consumed all input buffer
+ // set keepUncompressedBuf properly
+ if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer
+ keepUncompressedBuf = false;
+ uncompressedDirectBuf.clear();
+ uncompressedDirectBufOff = 0;
+ uncompressedDirectBufLen = 0;
+ } else { // zlib did not consume all input buffer
+ keepUncompressedBuf = true;
+ }
+
// Get atmost 'len' bytes
n = Math.min(n, len);
((ByteBuffer)compressedDirectBuf).get(b, off, n);
@@ -393,6 +407,7 @@ public class ZlibCompressor implements C
finished = false;
uncompressedDirectBuf.rewind();
uncompressedDirectBufOff = uncompressedDirectBufLen = 0;
+ keepUncompressedBuf = false;
compressedDirectBuf.limit(directBufferSize);
compressedDirectBuf.position(directBufferSize);
userBufOff = userBufLen = 0;
Propchange: hadoop/common/branches/branch-0.22/src/test/core/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 22 21:54:58 2010
@@ -1,2 +1,3 @@
+/hadoop/common/trunk/src/test/core:1037901
/hadoop/core/branches/branch-0.19/core/src/test/core:713112
/hadoop/core/trunk/src/test/core:776175-785643,785929-786278
Propchange:
hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/io/TestSequenceFile.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 22 21:54:58 2010
@@ -1,2 +1,3 @@
+/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/TestSequenceFile.java:1037901
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/io/TestSequenceFile.java:713112
/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/io/TestSequenceFile.java:776175-785643