This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 07bd95a AVRO-2262 Java compression codec fixes. (#352)
07bd95a is described below
commit 07bd95a82f93458196d44f8e285435a865d3d70d
Author: jacobtolar <[email protected]>
AuthorDate: Mon Nov 12 05:18:09 2018 -0600
AVRO-2262 Java compression codec fixes. (#352)
AVRO-2262 Java compression codec improvements
---
.../main/java/org/apache/avro/file/BZip2Codec.java | 34 +++++++---------------
.../src/main/java/org/apache/avro/file/Codec.java | 12 ++++++++
.../java/org/apache/avro/file/CodecFactory.java | 12 ++++----
.../java/org/apache/avro/file/DeflateCodec.java | 27 +++++------------
.../java/org/apache/avro/file/SnappyCodec.java | 13 +++++----
.../main/java/org/apache/avro/file/XZCodec.java | 24 ++++-----------
.../java/org/apache/avro/file/ZstandardCodec.java | 29 +++++-------------
7 files changed, 57 insertions(+), 94 deletions(-)
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
index e2dbc09..1d1f4ed 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
@@ -29,6 +29,8 @@ import
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream
public class BZip2Codec extends Codec {
public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private final byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+
private ByteArrayOutputStream outputBuffer;
static class Option extends CodecFactory {
@@ -43,41 +45,27 @@ public class BZip2Codec extends Codec {
@Override
public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException {
-
ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining());
- BZip2CompressorOutputStream outputStream = new
BZip2CompressorOutputStream(baos);
-
- try {
- outputStream.write(uncompressedData.array(),
- uncompressedData.position(),
- uncompressedData.remaining());
- } finally {
- outputStream.close();
+
+ try (BZip2CompressorOutputStream outputStream = new
BZip2CompressorOutputStream(baos)) {
+ outputStream.write(uncompressedData.array(),
computeOffset(uncompressedData), uncompressedData.remaining());
}
- ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
- return result;
+ return ByteBuffer.wrap(baos.toByteArray());
}
@Override
public ByteBuffer decompress(ByteBuffer compressedData) throws IOException {
- ByteArrayInputStream bais = new
ByteArrayInputStream(compressedData.array());
- BZip2CompressorInputStream inputStream = new
BZip2CompressorInputStream(bais);
- try {
+ ByteArrayInputStream bais = new
ByteArrayInputStream(compressedData.array(), computeOffset(compressedData),
compressedData.remaining());
+ try(BZip2CompressorInputStream inputStream = new
BZip2CompressorInputStream(bais)) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
-
int readCount = -1;
-
- while ( (readCount = inputStream.read(buffer, compressedData.position(),
buffer.length))> 0) {
+ while ((readCount = inputStream.read(buffer, compressedData.position(),
buffer.length)) > 0) {
baos.write(buffer, 0, readCount);
}
- ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
- return result;
- } finally {
- inputStream.close();
+ return ByteBuffer.wrap(baos.toByteArray());
}
}
@@ -100,6 +88,4 @@ public class BZip2Codec extends Codec {
outputBuffer.reset();
return outputBuffer;
}
-
-
}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
index bd335c9..d462139 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
@@ -22,6 +22,9 @@ import java.nio.ByteBuffer;
/**
* Interface for Avro-supported compression codecs for data files.
+ *
+ * Note that Codec objects may maintain internal state (e.g. buffers)
+ * and are not thread safe.
*/
public abstract class Codec {
/** Name of the codec; written to the file's metadata. */
@@ -37,12 +40,21 @@ public abstract class Codec {
**/
@Override
public abstract boolean equals(Object other);
+
/**
* Codecs must implement a hashCode() method that is consistent with
equals().*/
@Override
public abstract int hashCode();
+
@Override
public String toString() {
return getName();
}
+
+ // Codecs often reference the array inside a ByteBuffer. Compute the offset
+ // to the start of data correctly in the case that our ByteBuffer
+ // is a slice() of another.
+ protected static int computeOffset(ByteBuffer data) {
+ return data.arrayOffset() + data.position();
+ }
}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
index db51fc6..238e8a4 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
@@ -83,12 +83,12 @@ public abstract class CodecFactory {
public static final int DEFAULT_XZ_LEVEL = LZMA2Options.PRESET_DEFAULT;
static {
- addCodec("null", nullCodec());
- addCodec("deflate", deflateCodec(DEFAULT_DEFLATE_LEVEL));
- addCodec("snappy", snappyCodec());
- addCodec("bzip2", bzip2Codec());
- addCodec("xz", xzCodec(DEFAULT_XZ_LEVEL));
- addCodec("zstandard", zstandardCodec());
+ addCodec(DataFileConstants.NULL_CODEC, nullCodec());
+ addCodec(DataFileConstants.DEFLATE_CODEC,
deflateCodec(DEFAULT_DEFLATE_LEVEL));
+ addCodec(DataFileConstants.SNAPPY_CODEC, snappyCodec());
+ addCodec(DataFileConstants.BZIP2_CODEC, bzip2Codec());
+ addCodec(DataFileConstants.XZ_CODEC, xzCodec(DEFAULT_XZ_LEVEL));
+ addCodec(DataFileConstants.ZSTANDARD_CODEC, zstandardCodec());
}
/** Maps a codec name into a CodecFactory.
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
index 7080d65..5a3f927 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
@@ -36,7 +36,7 @@ import java.util.zip.InflaterOutputStream;
* {@link Inflater} and {@link Deflater}, is using
* RFC1951.
*/
-class DeflateCodec extends Codec {
+public class DeflateCodec extends Codec {
static class Option extends CodecFactory {
private int compressionLevel;
@@ -70,30 +70,19 @@ class DeflateCodec extends Codec {
@Override
public ByteBuffer compress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
- DeflaterOutputStream ios = new DeflaterOutputStream(baos, getDeflater());
- writeAndClose(data, ios);
- ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
- return result;
+ try(OutputStream outputStream = new DeflaterOutputStream(baos,
getDeflater())) {
+ outputStream.write(data.array(), computeOffset(data), data.remaining());
+ }
+ return ByteBuffer.wrap(baos.toByteArray());
}
@Override
public ByteBuffer decompress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
- InflaterOutputStream ios = new InflaterOutputStream(baos, getInflater());
- writeAndClose(data, ios);
- ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
- return result;
- }
-
- private void writeAndClose(ByteBuffer data, OutputStream to) throws
IOException {
- byte[] input = data.array();
- int offset = data.arrayOffset() + data.position();
- int length = data.remaining();
- try {
- to.write(input, offset, length);
- } finally {
- to.close();
+ try(OutputStream outputStream = new InflaterOutputStream(baos,
getInflater())) {
+ outputStream.write(data.array(), computeOffset(data), data.remaining());
}
+ return ByteBuffer.wrap(baos.toByteArray());
}
// get and initialize the inflater for use.
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
index 3c75bb7..04f7218 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
@@ -24,7 +24,7 @@ import java.util.zip.CRC32;
import org.xerial.snappy.Snappy;
/** * Implements Snappy compression and decompression. */
-class SnappyCodec extends Codec {
+public class SnappyCodec extends Codec {
private CRC32 crc32 = new CRC32();
static class Option extends CodecFactory {
@@ -40,12 +40,13 @@ class SnappyCodec extends Codec {
@Override
public ByteBuffer compress(ByteBuffer in) throws IOException {
+ int offset = computeOffset(in);
ByteBuffer out =
ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining())+4);
- int size = Snappy.compress(in.array(), in.position(), in.remaining(),
+ int size = Snappy.compress(in.array(), offset, in.remaining(),
out.array(), 0);
crc32.reset();
- crc32.update(in.array(), in.position(), in.remaining());
+ crc32.update(in.array(), offset, in.remaining());
out.putInt(size, (int)crc32.getValue());
out.limit(size+4);
@@ -55,9 +56,10 @@ class SnappyCodec extends Codec {
@Override
public ByteBuffer decompress(ByteBuffer in) throws IOException {
+ int offset = computeOffset(in);
ByteBuffer out = ByteBuffer.allocate
- (Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4));
- int size = Snappy.uncompress(in.array(),in.position(),in.remaining()-4,
+ (Snappy.uncompressedLength(in.array(), offset, in.remaining()-4));
+ int size = Snappy.uncompress(in.array(), offset, in.remaining()-4,
out.array(), 0);
out.limit(size);
@@ -79,5 +81,4 @@ class SnappyCodec extends Codec {
return false;
return true;
}
-
}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
index 6586818..92a742a 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
@@ -59,8 +59,9 @@ public class XZCodec extends Codec {
@Override
public ByteBuffer compress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
- OutputStream ios = new XZCompressorOutputStream(baos, compressionLevel);
- writeAndClose(data, ios);
+ try (OutputStream outputStream = new XZCompressorOutputStream(baos,
compressionLevel)) {
+ outputStream.write(data.array(), computeOffset(data), data.remaining());
+ }
return ByteBuffer.wrap(baos.toByteArray());
}
@@ -69,28 +70,15 @@ public class XZCodec extends Codec {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
InputStream bytesIn = new ByteArrayInputStream(
data.array(),
- data.arrayOffset() + data.position(),
+ computeOffset(data),
data.remaining());
- InputStream ios = new XZCompressorInputStream(bytesIn);
- try {
+
+ try (InputStream ios = new XZCompressorInputStream(bytesIn)) {
IOUtils.copy(ios, baos);
- } finally {
- ios.close();
}
return ByteBuffer.wrap(baos.toByteArray());
}
- private void writeAndClose(ByteBuffer data, OutputStream to) throws
IOException {
- byte[] input = data.array();
- int offset = data.arrayOffset() + data.position();
- int length = data.remaining();
- try {
- to.write(input, offset, length);
- } finally {
- to.close();
- }
- }
-
// get and initialize the output buffer for use.
private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
if (null == outputBuffer) {
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
index 4ec8433..deea4b8 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
@@ -46,10 +46,11 @@ public class ZstandardCodec extends Codec {
}
@Override
- public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException
{
- ByteArrayOutputStream baos =
getOutputBuffer(uncompressedData.remaining());
- OutputStream outputStream = new ZstdCompressorOutputStream(baos);
- writeAndClose(uncompressedData, outputStream);
+ public ByteBuffer compress(ByteBuffer data) throws IOException {
+ ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
+ try (OutputStream outputStream = new ZstdCompressorOutputStream(baos))
{
+ outputStream.write(data.array(), computeOffset(data),
data.remaining());
+ }
return ByteBuffer.wrap(baos.toByteArray());
}
@@ -58,28 +59,14 @@ public class ZstandardCodec extends Codec {
ByteArrayOutputStream baos =
getOutputBuffer(compressedData.remaining());
InputStream bytesIn = new ByteArrayInputStream(
compressedData.array(),
- compressedData.arrayOffset() + compressedData.position(),
+ computeOffset(compressedData),
compressedData.remaining());
- InputStream ios = new ZstdCompressorInputStream(bytesIn);
- try {
- IOUtils.copy(ios, baos);
- } finally {
- ios.close();
+ try (InputStream ios = new ZstdCompressorInputStream(bytesIn)) {
+ IOUtils.copy(ios, baos);
}
return ByteBuffer.wrap(baos.toByteArray());
}
- private void writeAndClose(ByteBuffer data, OutputStream to) throws
IOException {
- byte[] input = data.array();
- int offset = data.arrayOffset() + data.position();
- int length = data.remaining();
- try {
- to.write(input, offset, length);
- } finally {
- to.close();
- }
- }
-
// get and initialize the output buffer for use.
private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
if (outputBuffer == null) {