Author: eyang
Date: Fri Jan 11 00:02:12 2013
New Revision: 1431743
URL: http://svn.apache.org/viewvc?rev=1431743&view=rev
Log:
HADOOP-8419. Fixed GzipCode NPE reset for IBM JDK. (Yu Li via eyang)
Added:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/compress/GzipCodec.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1431743&r1=1431742&r2=1431743&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Jan 11 00:02:12 2013
@@ -167,6 +167,8 @@ Release 1.2.0 - unreleased
BUG FIXES
+ HADOOP-8419. Fixed GzipCode NPE reset for IBM JDK. (Yu Li via eyang)
+
MAPREDUCE-4904. OTHER_LOCAL_MAPS counter is not correct.
(Junping Du via llu)
Modified:
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/compress/GzipCodec.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/compress/GzipCodec.java?rev=1431743&r1=1431742&r2=1431743&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/compress/GzipCodec.java
(original)
+++
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/compress/GzipCodec.java
Fri Jan 11 00:02:12 2013
@@ -39,14 +39,74 @@ public class GzipCodec extends DefaultCo
protected static class GzipOutputStream extends CompressorStream {
private static class ResetableGZIPOutputStream extends GZIPOutputStream {
-
+ private static final int TRAILER_SIZE = 8;
+ public static final String JVMVendor= System.getProperty("java.vendor");
+ public static final String JVMVersion=
System.getProperty("java.version");
+ private static final boolean HAS_BROKEN_FINISH =
+ (JVMVendor.contains("IBM") && JVMVersion.contains("1.6.0"));
+
public ResetableGZIPOutputStream(OutputStream out) throws IOException {
super(out);
}
-
+
public void resetState() throws IOException {
def.reset();
}
+
+ /**
+ * Override this method for HADOOP-8419.
+ * Override because IBM implementation calls def.end() which
+ * causes problem when reseting the stream for reuse.
+ *
+ */
+ @Override
+ public void finish() throws IOException {
+ if (HAS_BROKEN_FINISH) {
+ if (!def.finished()) {
+ def.finish();
+ while (!def.finished()) {
+ int i = def.deflate(this.buf, 0, this.buf.length);
+ if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) {
+ writeTrailer(this.buf, i);
+ i += TRAILER_SIZE;
+ out.write(this.buf, 0, i);
+
+ return;
+ }
+ if (i > 0) {
+ out.write(this.buf, 0, i);
+ }
+ }
+
+ byte[] arrayOfByte = new byte[TRAILER_SIZE];
+ writeTrailer(arrayOfByte, 0);
+ out.write(arrayOfByte);
+ }
+ } else {
+ super.finish();
+ }
+ }
+
+ /** re-implement for HADOOP-8419 because the relative method in jdk is
invisible */
+ private void writeTrailer(byte[] paramArrayOfByte, int paramInt)
+ throws IOException {
+ writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt);
+ writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4);
+ }
+
+ /** re-implement for HADOOP-8419 because the relative method in jdk is
invisible */
+ private void writeInt(int paramInt1, byte[] paramArrayOfByte, int
paramInt2)
+ throws IOException {
+ writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2);
+ writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2);
+ }
+
+ /** re-implement for HADOOP-8419 because the relative method in jdk is
invisible */
+ private void writeShort(int paramInt1, byte[] paramArrayOfByte, int
paramInt2)
+ throws IOException {
+ paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF);
+ paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF);
+ }
}
public GzipOutputStream(OutputStream out) throws IOException {
Added:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java?rev=1431743&view=auto
==============================================================================
---
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
(added)
+++
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
Fri Jan 11 00:02:12 2013
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import junit.framework.TestCase;
+
+public class TestCompressionStreamReuse extends TestCase {
+ private static final Log LOG = LogFactory
+ .getLog(TestCompressionStreamReuse.class);
+
+ private Configuration conf = new Configuration();
+ private int count = 10000;
+ private int seed = new Random().nextInt();
+
+ public void testBZip2Codec() throws IOException {
+ resetStateTest(conf, seed, count,
+ "org.apache.hadoop.io.compress.BZip2Codec");
+ }
+
+ public void testGzipCompressStreamReuse() throws IOException {
+ resetStateTest(conf, seed, count,
+ "org.apache.hadoop.io.compress.GzipCodec");
+ }
+
+ public void testGzipCompressStreamReuseWithParam() throws IOException {
+ Configuration conf = new Configuration(this.conf);
+ ZlibFactory
+ .setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
+ ZlibFactory.setCompressionStrategy(conf,
+ CompressionStrategy.HUFFMAN_ONLY);
+ resetStateTest(conf, seed, count,
+ "org.apache.hadoop.io.compress.GzipCodec");
+ }
+
+ private static void resetStateTest(Configuration conf, int seed, int count,
+ String codecClass) throws IOException {
+ // Create the codec
+ CompressionCodec codec = null;
+ try {
+ codec = (CompressionCodec) ReflectionUtils.newInstance(conf
+ .getClassByName(codecClass), conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Illegal codec!");
+ }
+ LOG.info("Created a Codec object of type: " + codecClass);
+
+ // Generate data
+ DataOutputBuffer data = new DataOutputBuffer();
+ RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+ for (int i = 0; i < count; ++i) {
+ generator.next();
+ RandomDatum key = generator.getKey();
+ RandomDatum value = generator.getValue();
+
+ key.write(data);
+ value.write(data);
+ }
+ LOG.info("Generated " + count + " records");
+
+ // Compress data
+ DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
+ DataOutputStream deflateOut = new DataOutputStream(
+ new BufferedOutputStream(compressedDataBuffer));
+ CompressionOutputStream deflateFilter = codec
+ .createOutputStream(deflateOut);
+ deflateFilter.write(data.getData(), 0, data.getLength());
+ deflateFilter.finish();
+ deflateFilter.flush();
+ LOG.info("Finished compressing data");
+
+ // reset deflator
+ deflateFilter.resetState();
+ LOG.info("Finished reseting deflator");
+
+ // re-generate data
+ data.reset();
+ generator = new RandomDatum.Generator(seed);
+ for (int i = 0; i < count; ++i) {
+ generator.next();
+ RandomDatum key = generator.getKey();
+ RandomDatum value = generator.getValue();
+
+ key.write(data);
+ value.write(data);
+ }
+ DataInputBuffer originalData = new DataInputBuffer();
+ DataInputStream originalIn = new DataInputStream(
+ new BufferedInputStream(originalData));
+ originalData.reset(data.getData(), 0, data.getLength());
+
+ // re-compress data
+ compressedDataBuffer.reset();
+ deflateOut = new DataOutputStream(new BufferedOutputStream(
+ compressedDataBuffer));
+ deflateFilter = codec.createOutputStream(deflateOut);
+
+ deflateFilter.write(data.getData(), 0, data.getLength());
+ deflateFilter.finish();
+ deflateFilter.flush();
+ LOG.info("Finished re-compressing data");
+
+ // De-compress data
+ DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
+ deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
+ compressedDataBuffer.getLength());
+ CompressionInputStream inflateFilter = codec
+ .createInputStream(deCompressedDataBuffer);
+ DataInputStream inflateIn = new DataInputStream(
+ new BufferedInputStream(inflateFilter));
+
+ // Check
+ for (int i = 0; i < count; ++i) {
+ RandomDatum k1 = new RandomDatum();
+ RandomDatum v1 = new RandomDatum();
+ k1.readFields(originalIn);
+ v1.readFields(originalIn);
+
+ RandomDatum k2 = new RandomDatum();
+ RandomDatum v2 = new RandomDatum();
+ k2.readFields(inflateIn);
+ v2.readFields(inflateIn);
+ assertTrue(
+ "original and compressed-then-decompressed-output not equal",
+ k1.equals(k2) && v1.equals(v2));
+ }
+ LOG.info("SUCCESS! Completed checking " + count + " records");
+ }
+}