[
https://issues.apache.org/jira/browse/AVRO-2262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683592#comment-16683592
]
ASF GitHub Bot commented on AVRO-2262:
--------------------------------------
Fokko closed pull request #352: AVRO-2262 Java compression codec improvements
URL: https://github.com/apache/avro/pull/352
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
index e2dbc094f..1d1f4ed26 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
@@ -29,6 +29,8 @@
public class BZip2Codec extends Codec {
public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private final byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+
private ByteArrayOutputStream outputBuffer;
static class Option extends CodecFactory {
@@ -43,41 +45,27 @@ protected Codec createInstance() {
@Override
public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException {
-
ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining());
- BZip2CompressorOutputStream outputStream = new
BZip2CompressorOutputStream(baos);
-
- try {
- outputStream.write(uncompressedData.array(),
- uncompressedData.position(),
- uncompressedData.remaining());
- } finally {
- outputStream.close();
+
+ try (BZip2CompressorOutputStream outputStream = new
BZip2CompressorOutputStream(baos)) {
+ outputStream.write(uncompressedData.array(),
computeOffset(uncompressedData), uncompressedData.remaining());
}
- ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
- return result;
+ return ByteBuffer.wrap(baos.toByteArray());
}
@Override
public ByteBuffer decompress(ByteBuffer compressedData) throws IOException {
- ByteArrayInputStream bais = new
ByteArrayInputStream(compressedData.array());
- BZip2CompressorInputStream inputStream = new
BZip2CompressorInputStream(bais);
- try {
+ ByteArrayInputStream bais = new
ByteArrayInputStream(compressedData.array(), computeOffset(compressedData),
compressedData.remaining());
+ try(BZip2CompressorInputStream inputStream = new
BZip2CompressorInputStream(bais)) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
-
int readCount = -1;
-
- while ( (readCount = inputStream.read(buffer, compressedData.position(),
buffer.length))> 0) {
+ while ((readCount = inputStream.read(buffer, compressedData.position(),
buffer.length)) > 0) {
baos.write(buffer, 0, readCount);
}
- ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
- return result;
- } finally {
- inputStream.close();
+ return ByteBuffer.wrap(baos.toByteArray());
}
}
@@ -100,6 +88,4 @@ private ByteArrayOutputStream getOutputBuffer(int
suggestedLength) {
outputBuffer.reset();
return outputBuffer;
}
-
-
}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
index bd335c925..d4621390f 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
@@ -22,6 +22,9 @@
/**
* Interface for Avro-supported compression codecs for data files.
+ *
+ * Note that Codec objects may maintain internal state (e.g. buffers)
+ * and are not thread safe.
*/
public abstract class Codec {
/** Name of the codec; written to the file's metadata. */
@@ -37,12 +40,21 @@
**/
@Override
public abstract boolean equals(Object other);
+
/**
* Codecs must implement a hashCode() method that is consistent with
equals().*/
@Override
public abstract int hashCode();
+
@Override
public String toString() {
return getName();
}
+
+ // Codecs often reference the array inside a ByteBuffer. Compute the offset
+ // to the start of data correctly in the case that our ByteBuffer
+ // is a slice() of another.
+ protected static int computeOffset(ByteBuffer data) {
+ return data.arrayOffset() + data.position();
+ }
}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
index db51fc693..238e8a4d7 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
@@ -83,12 +83,12 @@ public static CodecFactory zstandardCodec() {
public static final int DEFAULT_XZ_LEVEL = LZMA2Options.PRESET_DEFAULT;
static {
- addCodec("null", nullCodec());
- addCodec("deflate", deflateCodec(DEFAULT_DEFLATE_LEVEL));
- addCodec("snappy", snappyCodec());
- addCodec("bzip2", bzip2Codec());
- addCodec("xz", xzCodec(DEFAULT_XZ_LEVEL));
- addCodec("zstandard", zstandardCodec());
+ addCodec(DataFileConstants.NULL_CODEC, nullCodec());
+ addCodec(DataFileConstants.DEFLATE_CODEC,
deflateCodec(DEFAULT_DEFLATE_LEVEL));
+ addCodec(DataFileConstants.SNAPPY_CODEC, snappyCodec());
+ addCodec(DataFileConstants.BZIP2_CODEC, bzip2Codec());
+ addCodec(DataFileConstants.XZ_CODEC, xzCodec(DEFAULT_XZ_LEVEL));
+ addCodec(DataFileConstants.ZSTANDARD_CODEC, zstandardCodec());
}
/** Maps a codec name into a CodecFactory.
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
index 7080d65ea..5a3f92718 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
@@ -36,7 +36,7 @@
* {@link Inflater} and {@link Deflater}, is using
* RFC1951.
*/
-class DeflateCodec extends Codec {
+public class DeflateCodec extends Codec {
static class Option extends CodecFactory {
private int compressionLevel;
@@ -70,30 +70,19 @@ public String getName() {
@Override
public ByteBuffer compress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
- DeflaterOutputStream ios = new DeflaterOutputStream(baos, getDeflater());
- writeAndClose(data, ios);
- ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
- return result;
+ try(OutputStream outputStream = new DeflaterOutputStream(baos,
getDeflater())) {
+ outputStream.write(data.array(), computeOffset(data), data.remaining());
+ }
+ return ByteBuffer.wrap(baos.toByteArray());
}
@Override
public ByteBuffer decompress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
- InflaterOutputStream ios = new InflaterOutputStream(baos, getInflater());
- writeAndClose(data, ios);
- ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
- return result;
- }
-
- private void writeAndClose(ByteBuffer data, OutputStream to) throws
IOException {
- byte[] input = data.array();
- int offset = data.arrayOffset() + data.position();
- int length = data.remaining();
- try {
- to.write(input, offset, length);
- } finally {
- to.close();
+ try(OutputStream outputStream = new InflaterOutputStream(baos,
getInflater())) {
+ outputStream.write(data.array(), computeOffset(data), data.remaining());
}
+ return ByteBuffer.wrap(baos.toByteArray());
}
// get and initialize the inflater for use.
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
index 3c75bb70f..04f721861 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
@@ -24,7 +24,7 @@
import org.xerial.snappy.Snappy;
/** * Implements Snappy compression and decompression. */
-class SnappyCodec extends Codec {
+public class SnappyCodec extends Codec {
private CRC32 crc32 = new CRC32();
static class Option extends CodecFactory {
@@ -40,12 +40,13 @@ private SnappyCodec() {}
@Override
public ByteBuffer compress(ByteBuffer in) throws IOException {
+ int offset = computeOffset(in);
ByteBuffer out =
ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining())+4);
- int size = Snappy.compress(in.array(), in.position(), in.remaining(),
+ int size = Snappy.compress(in.array(), offset, in.remaining(),
out.array(), 0);
crc32.reset();
- crc32.update(in.array(), in.position(), in.remaining());
+ crc32.update(in.array(), offset, in.remaining());
out.putInt(size, (int)crc32.getValue());
out.limit(size+4);
@@ -55,9 +56,10 @@ public ByteBuffer compress(ByteBuffer in) throws IOException
{
@Override
public ByteBuffer decompress(ByteBuffer in) throws IOException {
+ int offset = computeOffset(in);
ByteBuffer out = ByteBuffer.allocate
- (Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4));
- int size = Snappy.uncompress(in.array(),in.position(),in.remaining()-4,
+ (Snappy.uncompressedLength(in.array(), offset, in.remaining()-4));
+ int size = Snappy.uncompress(in.array(), offset, in.remaining()-4,
out.array(), 0);
out.limit(size);
@@ -79,5 +81,4 @@ public boolean equals(Object obj) {
return false;
return true;
}
-
}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
index 6586818fa..92a742a56 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
@@ -59,8 +59,9 @@ public String getName() {
@Override
public ByteBuffer compress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
- OutputStream ios = new XZCompressorOutputStream(baos, compressionLevel);
- writeAndClose(data, ios);
+ try (OutputStream outputStream = new XZCompressorOutputStream(baos,
compressionLevel)) {
+ outputStream.write(data.array(), computeOffset(data), data.remaining());
+ }
return ByteBuffer.wrap(baos.toByteArray());
}
@@ -69,28 +70,15 @@ public ByteBuffer decompress(ByteBuffer data) throws
IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
InputStream bytesIn = new ByteArrayInputStream(
data.array(),
- data.arrayOffset() + data.position(),
+ computeOffset(data),
data.remaining());
- InputStream ios = new XZCompressorInputStream(bytesIn);
- try {
+
+ try (InputStream ios = new XZCompressorInputStream(bytesIn)) {
IOUtils.copy(ios, baos);
- } finally {
- ios.close();
}
return ByteBuffer.wrap(baos.toByteArray());
}
- private void writeAndClose(ByteBuffer data, OutputStream to) throws
IOException {
- byte[] input = data.array();
- int offset = data.arrayOffset() + data.position();
- int length = data.remaining();
- try {
- to.write(input, offset, length);
- } finally {
- to.close();
- }
- }
-
// get and initialize the output buffer for use.
private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
if (null == outputBuffer) {
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
index 4ec84335f..deea4b88e 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
@@ -46,10 +46,11 @@ public String getName() {
}
@Override
- public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException
{
- ByteArrayOutputStream baos =
getOutputBuffer(uncompressedData.remaining());
- OutputStream outputStream = new ZstdCompressorOutputStream(baos);
- writeAndClose(uncompressedData, outputStream);
+ public ByteBuffer compress(ByteBuffer data) throws IOException {
+ ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
+ try (OutputStream outputStream = new ZstdCompressorOutputStream(baos))
{
+ outputStream.write(data.array(), computeOffset(data),
data.remaining());
+ }
return ByteBuffer.wrap(baos.toByteArray());
}
@@ -58,28 +59,14 @@ public ByteBuffer decompress(ByteBuffer compressedData)
throws IOException {
ByteArrayOutputStream baos =
getOutputBuffer(compressedData.remaining());
InputStream bytesIn = new ByteArrayInputStream(
compressedData.array(),
- compressedData.arrayOffset() + compressedData.position(),
+ computeOffset(compressedData),
compressedData.remaining());
- InputStream ios = new ZstdCompressorInputStream(bytesIn);
- try {
- IOUtils.copy(ios, baos);
- } finally {
- ios.close();
+ try (InputStream ios = new ZstdCompressorInputStream(bytesIn)) {
+ IOUtils.copy(ios, baos);
}
return ByteBuffer.wrap(baos.toByteArray());
}
- private void writeAndClose(ByteBuffer data, OutputStream to) throws
IOException {
- byte[] input = data.array();
- int offset = data.arrayOffset() + data.position();
- int length = data.remaining();
- try {
- to.write(input, offset, length);
- } finally {
- to.close();
- }
- }
-
// get and initialize the output buffer for use.
private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
if (outputBuffer == null) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Java compression codec improvements
> -----------------------------------
>
> Key: AVRO-2262
> URL: https://issues.apache.org/jira/browse/AVRO-2262
> Project: Apache Avro
> Issue Type: Task
> Components: java
> Affects Versions: 1.8.2
> Reporter: Fokko Driesprong
> Assignee: Jacob Tolar
> Priority: Major
> Fix For: 1.9.0
>
>
> * Update a few things to use try-with-resources
> * Updated CodecFactory to reference constants for codec names
> * Fixed a small bug in Snappy and BZip2: compression/decompression were
> incorrect if the input ByteBuffer was a a slice(). I don't see anywhere that
> this would actually happen currently, but some codecs were written to account
> for this correctly; now they're all correct. Updated everything to compute
> the correct offset into the underlying array. (I can add a test for this in
> TestAllCodecs once #351 is merged).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)