Author: eli
Date: Mon Nov 22 21:51:57 2010
New Revision: 1037901
URL: http://svn.apache.org/viewvc?rev=1037901&view=rev
Log:
HADOOP-6683. ZlibCompressor does not fully utilize the buffer. Contributed by
Kang Xiao
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1037901&r1=1037900&r2=1037901&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Mon Nov 22 21:51:57 2010
@@ -212,6 +212,9 @@ Release 0.22.0 - Unreleased
HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..).
(Erik Steffl via szetszwo)
+ HADOOP-6683. ZlibCompressor does not fully utilize the buffer.
+ (Kang Xiao via eli)
+
BUG FIXES
HADOOP-6638. try to relogin in a case of failed RPC connection (expired
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?rev=1037901&r1=1037900&r2=1037901&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
Mon Nov 22 21:51:57 2010
@@ -53,6 +53,7 @@ public class ZlibCompressor implements C
private int userBufOff = 0, userBufLen = 0;
private Buffer uncompressedDirectBuf = null;
private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
+ private boolean keepUncompressedBuf = false;
private Buffer compressedDirectBuf = null;
private boolean finish, finished;
@@ -269,6 +270,7 @@ public class ZlibCompressor implements C
this.userBuf = b;
this.userBufOff = off;
this.userBufLen = len;
+ uncompressedDirectBufOff = 0;
setInputFromSavedData();
// Reinitialize zlib's output direct buffer
@@ -276,21 +278,13 @@ public class ZlibCompressor implements C
compressedDirectBuf.position(directBufferSize);
}
+ //copy enough data from userBuf to uncompressedDirectBuf
synchronized void setInputFromSavedData() {
- uncompressedDirectBufOff = 0;
- uncompressedDirectBufLen = userBufLen;
- if (uncompressedDirectBufLen > directBufferSize) {
- uncompressedDirectBufLen = directBufferSize;
- }
-
- // Reinitialize zlib's input direct buffer
- uncompressedDirectBuf.rewind();
- ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff,
- uncompressedDirectBufLen);
-
- // Note how much data is being fed to zlib
- userBufOff += uncompressedDirectBufLen;
- userBufLen -= uncompressedDirectBufLen;
+ int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
+ ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len);
+ userBufLen -= len;
+ userBufOff += len;
+ uncompressedDirectBufLen = uncompressedDirectBuf.position();
}
public synchronized void setDictionary(byte[] b, int off, int len) {
@@ -310,12 +304,21 @@ public class ZlibCompressor implements C
}
// Check if zlib has consumed all input
- if (uncompressedDirectBufLen <= 0) {
+ // compress should be invoked if keepUncompressedBuf true
+ if (keepUncompressedBuf && uncompressedDirectBufLen > 0)
+ return false;
+
+ if (uncompressedDirectBuf.remaining() > 0) {
// Check if we have consumed all user-input
if (userBufLen <= 0) {
return true;
} else {
+ // copy enough data from userBuf to uncompressedDirectBuf
setInputFromSavedData();
+ if (uncompressedDirectBuf.remaining() > 0) // uncompressedDirectBuf is
not full
+ return true;
+ else
+ return false;
}
}
@@ -359,6 +362,17 @@ public class ZlibCompressor implements C
n = deflateBytesDirect();
compressedDirectBuf.limit(n);
+ // Check if zlib consumed all input buffer
+ // set keepUncompressedBuf properly
+ if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer
+ keepUncompressedBuf = false;
+ uncompressedDirectBuf.clear();
+ uncompressedDirectBufOff = 0;
+ uncompressedDirectBufLen = 0;
+ } else { // zlib did not consume all input buffer
+ keepUncompressedBuf = true;
+ }
+
// Get atmost 'len' bytes
n = Math.min(n, len);
((ByteBuffer)compressedDirectBuf).get(b, off, n);
@@ -393,6 +407,7 @@ public class ZlibCompressor implements C
finished = false;
uncompressedDirectBuf.rewind();
uncompressedDirectBufOff = uncompressedDirectBufLen = 0;
+ keepUncompressedBuf = false;
compressedDirectBuf.limit(directBufferSize);
compressedDirectBuf.position(directBufferSize);
userBufOff = userBufLen = 0;